package com.google.cloud.bigquery.connector.common;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/bigquery/connector/common/ParallelArrowReader.class */
public class ParallelArrowReader implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ParallelArrowReader.class);
    private static final Object DONE_SENTINEL = new Object();
    private final BlockingQueue<Object> queue;
    private final Semaphore queueSemaphore;
    private final List<ArrowReader> readers;
    private final ExecutorService executor;
    private final VectorLoader loader;
    private final BigQueryStorageReadRowsTracer rootTracer;
    private final BigQueryStorageReadRowsTracer[] tracers;
    private final AtomicInteger readersReady;
    private Thread readerThread;

    public ParallelArrowReader(List<ArrowReader> list, ExecutorService executorService, VectorLoader vectorLoader, BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer) {
        this.readers = list;
        this.queue = new ArrayBlockingQueue(list.size() + 2);
        this.executor = executorService;
        this.loader = vectorLoader;
        this.rootTracer = bigQueryStorageReadRowsTracer;
        this.queueSemaphore = new Semaphore(list.size());
        this.readersReady = new AtomicInteger(list.size());
        this.tracers = new BigQueryStorageReadRowsTracer[list.size()];
        for (int i = 0; i < list.size(); i++) {
            this.tracers[i] = this.rootTracer.forkWithPrefix("reader-thread-" + i);
        }
        start();
    }

    public boolean next() throws IOException {
        this.rootTracer.nextBatchNeeded();
        this.rootTracer.readRowsResponseRequested();
        try {
            Object take = this.queue.take();
            this.queueSemaphore.release();
            if (take == DONE_SENTINEL) {
                return false;
            }
            if (take instanceof Throwable) {
                if (take instanceof IOException) {
                    throw ((IOException) take);
                }
                throw new IOException((Throwable) take);
            }
            Preconditions.checkState(take instanceof ArrowRecordBatch, "Expected future object");
            ArrowRecordBatch arrowRecordBatch = (ArrowRecordBatch) take;
            this.rootTracer.readRowsResponseObtained(0L);
            if (arrowRecordBatch == null) {
                return false;
            }
            this.rootTracer.rowsParseStarted();
            this.loader.load(arrowRecordBatch);
            this.rootTracer.rowsParseFinished(arrowRecordBatch.getLength());
            arrowRecordBatch.close();
            return true;
        } catch (InterruptedException e) {
            log.info("Interrupted when waiting for next batch.");
            return false;
        }
    }

    private void start() {
        this.readerThread = new Thread(this::consumeReaders);
        this.readerThread.setDaemon(true);
        this.readerThread.start();
        this.rootTracer.startStream();
    }

    private void consumeReaders() {
        try {
            AtomicBoolean[] atomicBooleanArr = new AtomicBoolean[this.readers.size()];
            long[] jArr = new long[this.readers.size()];
            VectorUnloader[] vectorUnloaderArr = new VectorUnloader[this.readers.size()];
            VectorSchemaRoot[] vectorSchemaRootArr = new VectorSchemaRoot[this.readers.size()];
            for (int i = 0; i < atomicBooleanArr.length; i++) {
                atomicBooleanArr[i] = new AtomicBoolean();
                atomicBooleanArr[i].set(true);
                jArr[i] = 0;
                vectorSchemaRootArr[i] = this.readers.get(i).getVectorSchemaRoot();
                vectorUnloaderArr[i] = new VectorUnloader(vectorSchemaRootArr[i], true, false);
                this.tracers[i].startStream();
            }
            while (this.readersReady.get() > 0) {
                for (int i2 = 0; i2 < this.readers.size(); i2++) {
                    if (atomicBooleanArr[i2].get()) {
                        ArrowReader arrowReader = this.readers.get(i2);
                        int i3 = i2;
                        this.queueSemaphore.acquire();
                        this.executor.submit(() -> {
                            synchronized (vectorSchemaRootArr[i3]) {
                                if (atomicBooleanArr[i3].get()) {
                                    try {
                                        this.tracers[i3].readRowsResponseRequested();
                                        atomicBooleanArr[i3].set(arrowReader.loadNextBatch());
                                        if (!atomicBooleanArr[i3].get()) {
                                            this.queueSemaphore.release();
                                        }
                                        this.tracers[i3].readRowsResponseObtained(arrowReader.bytesRead() - jArr[i3]);
                                        jArr[i3] = arrowReader.bytesRead();
                                    } catch (Throwable th) {
                                        log.info("Exception caught while consuming reader.", th);
                                        atomicBooleanArr[i3].set(false);
                                        this.readersReady.set(0);
                                        Preconditions.checkState(this.queue.offer(th), "Expected space in queue");
                                    }
                                    if (!atomicBooleanArr[i3].get()) {
                                        this.readersReady.addAndGet(-1);
                                        return;
                                    }
                                    int i4 = 0;
                                    try {
                                        i4 = arrowReader.getVectorSchemaRoot().getRowCount();
                                    } catch (IOException e) {
                                        this.queue.offer(e);
                                    }
                                    this.tracers[i3].rowsParseStarted();
                                    ArrowRecordBatch recordBatch = vectorUnloaderArr[i3].getRecordBatch();
                                    this.tracers[i3].rowsParseFinished(i4);
                                    try {
                                        Preconditions.checkState(this.queue.offer(recordBatch), "Expected space in queue");
                                    } catch (Exception e2) {
                                        recordBatch.close();
                                        throw e2;
                                    }
                                }
                            }
                        });
                    }
                }
            }
            Preconditions.checkState(this.queue.offer(DONE_SENTINEL), "Expected available capacity");
        } catch (Throwable th) {
            log.info("Read ahead caught exceptions", th);
            Preconditions.checkState(this.queue.offer(th), "Expected available capacity");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.rootTracer.finished();
        if (this.readerThread != null) {
            this.readersReady.set(0);
            this.readerThread.interrupt();
            try {
                this.readerThread.join(10000L);
            } catch (InterruptedException e) {
                log.info("Interrupted while waiting for reader thread to finish.");
            }
            if (this.readerThread.isAlive()) {
                log.warn("Reader thread did not shutdown in 10 seconds.");
            } else {
                log.info("Reader thread stopped.  Queue size: {}", Integer.valueOf(this.queue.size()));
            }
        }
        this.executor.shutdownNow();
        try {
            if (!this.executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                log.warn("executor did not terminate after 10 seconds");
            }
        } catch (InterruptedException e2) {
            log.info("Interrupted when awaiting executor termination");
        }
        this.queue.stream().filter(obj -> {
            return obj instanceof ArrowRecordBatch;
        }).map(obj2 -> {
            return (ArrowRecordBatch) obj2;
        }).forEach((v0) -> {
            v0.close();
        });
        for (BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer : this.tracers) {
            bigQueryStorageReadRowsTracer.finished();
        }
        Iterator<ArrowReader> it = this.readers.iterator();
        while (it.hasNext()) {
            try {
                it.next().close(false);
            } catch (Exception e3) {
                log.info("Trouble closing delegate readers", e3);
            }
        }
    }
}
