package com.navercorp.spring.data.jdbc.plus.sql.support.template;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.jdbc.core.RowCountCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/navercorp/spring/data/jdbc/plus/sql/support/template/JdbcReactiveTemplate.class */
public class JdbcReactiveTemplate {
    private final Log logger;
    private final Scheduler scheduler;
    private final int defaultQueueSize;
    private final long defaultBufferTimeout;

    /* loaded from: input_file:com/navercorp/spring/data/jdbc/plus/sql/support/template/JdbcReactiveTemplate$EndOfFluxItem.class */
    private static class EndOfFluxItem<R> extends FluxItem<R> {
        private EndOfFluxItem() {
            super(null);
        }

        @Override // com.navercorp.spring.data.jdbc.plus.sql.support.template.JdbcReactiveTemplate.FluxItem
        protected boolean isEnd() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/navercorp/spring/data/jdbc/plus/sql/support/template/JdbcReactiveTemplate$ErrorFluxItem.class */
    public static class ErrorFluxItem<R> extends FluxItem<R> {
        Exception error;

        private ErrorFluxItem(Exception exc) {
            super(null);
            this.error = exc;
        }

        @Override // com.navercorp.spring.data.jdbc.plus.sql.support.template.JdbcReactiveTemplate.FluxItem
        protected boolean isError() {
            return true;
        }

        @Override // com.navercorp.spring.data.jdbc.plus.sql.support.template.JdbcReactiveTemplate.FluxItem
        protected boolean isEnd() {
            return true;
        }

        @Override // com.navercorp.spring.data.jdbc.plus.sql.support.template.JdbcReactiveTemplate.FluxItem
        public Exception getError() {
            return this.error;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/navercorp/spring/data/jdbc/plus/sql/support/template/JdbcReactiveTemplate$FluxItem.class */
    public static class FluxItem<R> {
        private static final FluxItem END_ITEM = new EndOfFluxItem();
        private final R item;

        private FluxItem(R r) {
            this.item = r;
        }

        protected boolean isEnd() {
            return false;
        }

        protected boolean isError() {
            return false;
        }

        private R getItem() {
            return this.item;
        }

        @Nullable
        protected Exception getError() {
            return null;
        }

        private static FluxItem errorInstance(Exception exc) {
            return new ErrorFluxItem(exc);
        }
    }

    public JdbcReactiveTemplate() {
        this(Schedulers.boundedElastic(), 100, 10000L);
    }

    public JdbcReactiveTemplate(Scheduler scheduler, int i, long j) {
        this.logger = LogFactory.getLog(getClass());
        this.scheduler = scheduler;
        this.defaultQueueSize = i;
        this.defaultBufferTimeout = j;
    }

    private static <R> FluxItem<R> endItem() {
        return FluxItem.END_ITEM;
    }

    private static <R> FluxItem<R> errorItem(Exception exc) {
        return FluxItem.errorInstance(exc);
    }

    public <R> Flux<R> queryFlux(String str, NamedParameterJdbcOperations namedParameterJdbcOperations, SqlParameterSource sqlParameterSource, RowMapper<R> rowMapper) {
        return queryFlux(str, namedParameterJdbcOperations, sqlParameterSource, rowMapper, this.scheduler, this.defaultQueueSize, this.defaultBufferTimeout);
    }

    public <R> Flux<R> queryFlux(String str, NamedParameterJdbcOperations namedParameterJdbcOperations, SqlParameterSource sqlParameterSource, RowMapper<R> rowMapper, Scheduler scheduler) {
        return queryFlux(str, namedParameterJdbcOperations, sqlParameterSource, rowMapper, scheduler, this.defaultQueueSize, this.defaultBufferTimeout);
    }

    public <R> Flux<R> queryFlux(String str, NamedParameterJdbcOperations namedParameterJdbcOperations, SqlParameterSource sqlParameterSource, RowMapper<R> rowMapper, int i, long j) {
        return queryFlux(str, namedParameterJdbcOperations, sqlParameterSource, rowMapper, this.scheduler, i, j);
    }

    public <R> Flux<R> queryFlux(String str, NamedParameterJdbcOperations namedParameterJdbcOperations, SqlParameterSource sqlParameterSource, RowMapper<R> rowMapper, Scheduler scheduler, int i, long j) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(i);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return generateFluxFromQueue(linkedBlockingQueue, j, atomicBoolean).doOnCancel(() -> {
            atomicBoolean.set(true);
        }).doFirst(() -> {
            scheduler.schedule(() -> {
                try {
                    namedParameterJdbcOperations.query(str, sqlParameterSource, new RowCountCallbackHandler() { // from class: com.navercorp.spring.data.jdbc.plus.sql.support.template.JdbcReactiveTemplate.1
                        public void processRow(ResultSet resultSet, int i2) throws SQLException {
                            if (atomicBoolean.get()) {
                                throw new DataAccessResourceFailureException("Connection closed by client.");
                            }
                            JdbcReactiveTemplate.this.insertToBlockingQueue(linkedBlockingQueue, new FluxItem(rowMapper.mapRow(resultSet, i2)), atomicBoolean, j);
                        }
                    });
                    if (atomicBoolean.get()) {
                        return;
                    }
                    insertToBlockingQueue(linkedBlockingQueue, endItem(), atomicBoolean, j);
                } catch (Exception e) {
                    atomicBoolean.set(true);
                    insertToBlockingQueue(linkedBlockingQueue, errorItem(e), atomicBoolean, j);
                    this.logger.error("Failed to generate flux.", e);
                    throw e;
                }
            });
        });
    }

    protected void handleError(Exception exc) throws Exception {
        if (exc == null) {
            return;
        }
        this.logger.error("Exception occured while reading flux", exc);
    }

    private <R> void insertToBlockingQueue(BlockingQueue<FluxItem<R>> blockingQueue, FluxItem<R> fluxItem, AtomicBoolean atomicBoolean, long j) {
        try {
            if (blockingQueue.offer(fluxItem, j, TimeUnit.MILLISECONDS)) {
                return;
            }
            atomicBoolean.set(true);
            throw new TimeoutException("Cannot insert into blocking queue.");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.error("InterruptedException occurred", e);
        } catch (TimeoutException e2) {
            throw new DataAccessResourceFailureException("Timeout to get item from queue.", e2);
        }
    }

    private <R> Flux<R> generateFluxFromQueue(BlockingQueue<FluxItem<R>> blockingQueue, long j, AtomicBoolean atomicBoolean) {
        return Flux.generate(synchronousSink -> {
            if (atomicBoolean.get()) {
                synchronousSink.error(new DataAccessResourceFailureException("Database Connection is closed."));
            }
            try {
                FluxItem fluxItem = (FluxItem) blockingQueue.poll(j, TimeUnit.MILLISECONDS);
                if (fluxItem == null) {
                    synchronousSink.error(new DataAccessResourceFailureException("Cannot take element from blocking queue."));
                    return;
                }
                if (fluxItem.isError()) {
                    try {
                        handleError(fluxItem.getError());
                    } catch (Exception e) {
                        synchronousSink.error(e);
                    }
                }
                if (fluxItem.isEnd()) {
                    synchronousSink.complete();
                } else {
                    synchronousSink.next(fluxItem.getItem());
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                this.logger.error("InterruptedException occurred", e2);
            }
        });
    }
}
