/*
 * Decompiled with CFR 0.152.
 */
package com.databricks.jdbc.api.impl.arrow;

import com.databricks.internal.sdk.service.sql.BaseChunkInfo;
import com.databricks.jdbc.api.impl.arrow.AbstractArrowResultChunk;
import com.databricks.jdbc.api.impl.arrow.ChunkDownloadManager;
import com.databricks.jdbc.api.impl.arrow.ChunkLinkDownloadService;
import com.databricks.jdbc.api.impl.arrow.ChunkProvider;
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.common.util.DatabricksThriftUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
import com.databricks.jdbc.model.client.thrift.generated.TRowSet;
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
import com.databricks.jdbc.model.core.ExternalLink;
import com.databricks.jdbc.model.core.ResultData;
import com.databricks.jdbc.model.core.ResultManifest;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public abstract class AbstractRemoteChunkProvider<T extends AbstractArrowResultChunk>
implements ChunkProvider,
ChunkDownloadManager {
    private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(AbstractRemoteChunkProvider.class);
    protected final IDatabricksSession session;
    protected final StatementId statementId;
    protected final IDatabricksHttpClient httpClient;
    protected final CompressionCodec compressionCodec;
    protected final ConcurrentMap<Long, T> chunkIndexToChunksMap;
    protected long chunkCount;
    protected long rowCount;
    protected long currentChunkIndex;
    protected long nextChunkToDownload;
    protected long totalChunksInMemory;
    protected long allowedChunksInMemory;
    protected boolean isClosed;
    protected final int maxParallelChunkDownloadsPerQuery;
    protected final ChunkLinkDownloadService<T> linkDownloadService;
    protected final int chunkReadyTimeoutSeconds;

    protected AbstractRemoteChunkProvider(StatementId statementId, ResultManifest resultManifest, ResultData resultData, IDatabricksSession session, IDatabricksHttpClient httpClient, int maxParallelChunkDownloadsPerQuery, CompressionCodec compressionCodec) throws DatabricksSQLException {
        this.chunkReadyTimeoutSeconds = session.getConnectionContext().getChunkReadyTimeoutSeconds();
        this.maxParallelChunkDownloadsPerQuery = maxParallelChunkDownloadsPerQuery;
        this.session = session;
        this.httpClient = httpClient;
        this.statementId = statementId;
        this.compressionCodec = compressionCodec;
        this.chunkCount = resultManifest.getTotalChunkCount();
        this.rowCount = resultManifest.getTotalRowCount();
        this.chunkIndexToChunksMap = this.initializeChunksMap(resultManifest, resultData, statementId);
        this.linkDownloadService = new ChunkLinkDownloadService<T>(session, statementId, this.chunkCount, this.chunkIndexToChunksMap, resultData.getExternalLinks() != null ? (long)resultData.getExternalLinks().size() : 1L);
        TelemetryCollector.getInstance().recordTotalChunks(statementId, this.chunkCount);
        this.initializeData();
    }

    protected AbstractRemoteChunkProvider(IDatabricksStatementInternal parentStatement, TFetchResultsResp resultsResp, IDatabricksSession session, IDatabricksHttpClient httpClient, int maxParallelChunkDownloadsPerQuery, CompressionCodec compressionCodec) throws DatabricksSQLException {
        this.chunkReadyTimeoutSeconds = session.getConnectionContext().getChunkReadyTimeoutSeconds();
        this.maxParallelChunkDownloadsPerQuery = maxParallelChunkDownloadsPerQuery;
        this.session = session;
        this.httpClient = httpClient;
        this.statementId = parentStatement.getStatementId();
        this.compressionCodec = compressionCodec;
        this.chunkIndexToChunksMap = this.initializeChunksMap(resultsResp, parentStatement, session);
        this.linkDownloadService = new ChunkLinkDownloadService<T>(session, this.statementId, this.chunkCount, this.chunkIndexToChunksMap, this.chunkCount);
        this.initializeData();
    }

    protected abstract T createChunk(StatementId var1, long var2, BaseChunkInfo var4) throws DatabricksSQLException;

    protected abstract T createChunk(StatementId var1, long var2, TSparkArrowResultLink var4) throws DatabricksSQLException;

    @Override
    public CompressionCodec getCompressionCodec() {
        return this.compressionCodec;
    }

    @Override
    public boolean hasNextChunk() {
        return this.currentChunkIndex < this.chunkCount - 1L;
    }

    @Override
    public long getRowCount() {
        return this.rowCount;
    }

    @Override
    public long getChunkCount() {
        return this.chunkCount;
    }

    public T getChunk() throws DatabricksSQLException {
        if (this.currentChunkIndex < 0L) {
            return null;
        }
        AbstractArrowResultChunk chunk = (AbstractArrowResultChunk)this.chunkIndexToChunksMap.get(this.currentChunkIndex);
        try {
            chunk.waitForChunkReady();
        }
        catch (InterruptedException e) {
            LOGGER.error(e, "Caught interrupted exception while waiting for chunk [%s] for statement [%s]. Exception [%s]", chunk.getChunkIndex(), this.statementId, e.getMessage());
            Thread.currentThread().interrupt();
            throw new DatabricksSQLException("Operation interrupted while waiting for chunk ready", (Throwable)e, DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
        }
        catch (ExecutionException | TimeoutException e) {
            throw new DatabricksSQLException("Failed to ready chunk", e.getCause(), DatabricksDriverErrorCode.CHUNK_READY_ERROR);
        }
        return (T)chunk;
    }

    @Override
    public boolean next() throws DatabricksSQLException {
        if (this.currentChunkIndex >= 0L) {
            this.releaseChunk();
        }
        if (!this.hasNextChunk()) {
            return false;
        }
        ++this.currentChunkIndex;
        return true;
    }

    @Override
    public final void close() {
        try {
            this.doClose();
        }
        finally {
            this.linkDownloadService.shutdown();
        }
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    public long getAllowedChunksInMemory() {
        return this.allowedChunksInMemory;
    }

    protected void doClose() {
    }

    private void initializeData() throws DatabricksSQLException {
        DatabricksThreadContextHolder.setStatementId(this.statementId);
        this.nextChunkToDownload = 0L;
        this.currentChunkIndex = -1L;
        this.totalChunksInMemory = 0L;
        this.allowedChunksInMemory = Math.min((long)this.maxParallelChunkDownloadsPerQuery, this.chunkCount);
        this.downloadNextChunks();
    }

    private ConcurrentMap<Long, T> initializeChunksMap(ResultManifest resultManifest, ResultData resultData, StatementId statementId) throws DatabricksSQLException {
        ConcurrentHashMap<Long, T> chunkIndexMap = new ConcurrentHashMap<Long, T>();
        if (resultManifest.getTotalChunkCount() == 0L) {
            return chunkIndexMap;
        }
        for (BaseChunkInfo chunkInfo : resultManifest.getChunks()) {
            LOGGER.debug("Manifest chunk information: " + chunkInfo.toString());
            chunkIndexMap.put(chunkInfo.getChunkIndex(), this.createChunk(statementId, (long)chunkInfo.getChunkIndex(), chunkInfo));
        }
        for (ExternalLink externalLink : resultData.getExternalLinks()) {
            ((AbstractArrowResultChunk)chunkIndexMap.get(externalLink.getChunkIndex())).setChunkLink(externalLink);
        }
        return chunkIndexMap;
    }

    private ConcurrentMap<Long, T> initializeChunksMap(TFetchResultsResp resultsResp, IDatabricksStatementInternal parentStatement, IDatabricksSession session) throws DatabricksSQLException {
        ConcurrentHashMap chunkIndexMap = new ConcurrentHashMap();
        this.populateChunkIndexMap(resultsResp.getResults(), chunkIndexMap);
        while (resultsResp.hasMoreRows) {
            resultsResp = session.getDatabricksClient().getMoreResults(parentStatement);
            this.populateChunkIndexMap(resultsResp.getResults(), chunkIndexMap);
        }
        TelemetryCollector.getInstance().recordTotalChunks(this.statementId, this.chunkCount);
        return chunkIndexMap;
    }

    private void populateChunkIndexMap(TRowSet resultData, ConcurrentMap<Long, T> chunkIndexMap) throws DatabricksSQLException {
        this.rowCount += DatabricksThriftUtil.getRowCount(resultData);
        for (TSparkArrowResultLink resultLink : resultData.getResultLinks()) {
            LOGGER.debug("Chunk information log - Row Offset: %s, Row Count: %s, Expiry Time: %s", resultLink.getStartRowOffset(), resultLink.getRowCount(), resultLink.getExpiryTime());
            chunkIndexMap.put(this.chunkCount, this.createChunk(this.statementId, this.chunkCount, resultLink));
            ++this.chunkCount;
        }
    }

    private void releaseChunk() throws DatabricksSQLException {
        if (((AbstractArrowResultChunk)this.chunkIndexToChunksMap.get(this.currentChunkIndex)).releaseChunk()) {
            --this.totalChunksInMemory;
            this.downloadNextChunks();
        }
    }
}

