package com.datastax.oss.dsbulk.workflow.unload;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.shaded.guava.common.base.Stopwatch;
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
import com.datastax.oss.dsbulk.connectors.api.CommonConnectorFeature;
import com.datastax.oss.dsbulk.connectors.api.Connector;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.executor.api.reader.BulkReader;
import com.datastax.oss.dsbulk.executor.api.result.ReadResult;
import com.datastax.oss.dsbulk.workflow.api.Workflow;
import com.datastax.oss.dsbulk.workflow.api.utils.DurationUtils;
import com.datastax.oss.dsbulk.workflow.commons.log.DefaultRangeReadResource;
import com.datastax.oss.dsbulk.workflow.commons.log.LogManager;
import com.datastax.oss.dsbulk.workflow.commons.log.RangeReadResource;
import com.datastax.oss.dsbulk.workflow.commons.metrics.MetricsManager;
import com.datastax.oss.dsbulk.workflow.commons.schema.ReadResultMapper;
import com.datastax.oss.dsbulk.workflow.commons.settings.CodecSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.ConnectorSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.DriverSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.EngineSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.ExecutorSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.LogSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.MonitoringSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.SchemaGenerationStrategy;
import com.datastax.oss.dsbulk.workflow.commons.settings.SchemaSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.SettingsManager;
import com.datastax.oss.dsbulk.workflow.commons.statement.RangeReadBoundStatement;
import com.datastax.oss.dsbulk.workflow.commons.utils.CloseableUtils;
import com.datastax.oss.dsbulk.workflow.commons.utils.ClusterInformationUtils;
import com.typesafe.config.Config;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/datastax/oss/dsbulk/workflow/unload/UnloadWorkflow.class */
public class UnloadWorkflow implements Workflow {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) UnloadWorkflow.class);
    private final SettingsManager settingsManager;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private String executionId;
    private Connector connector;
    private Set<Scheduler> schedulers;
    private ReadResultMapper readResultMapper;
    private MetricsManager metricsManager;
    private LogManager logManager;
    private CqlSession session;
    private BulkReader executor;
    private List<RangeReadBoundStatement> readStatements;
    private Function<Publisher<Record>, Publisher<Record>> writer;
    private Function<Flux<ReadResult>, Flux<ReadResult>> totalItemsMonitor;
    private Function<Flux<Record>, Flux<Record>> failedRecordsMonitor;
    private Function<Flux<ReadResult>, Flux<ReadResult>> failedReadResultsMonitor;
    private Function<Flux<Record>, Flux<Record>> failedRecordsHandler;
    private Function<Flux<ReadResult>, Flux<ReadResult>> totalItemsCounter;
    private Function<Flux<ReadResult>, Flux<ReadResult>> failedReadsHandler;
    private Function<Flux<ReadResult>, Flux<ReadResult>> queryWarningsHandler;
    private Function<Flux<Record>, Flux<Record>> unmappableRecordsHandler;
    private Function<Flux<Record>, Flux<Void>> successfulRecordsHandler;
    private Function<Flux<RangeReadResource>, Flux<Flux<ReadResult>>> checkpointHandler;
    private Function<Flux<Void>, Flux<Void>> terminationHandler;
    private int readConcurrency;
    private int numCores;
    private int writeConcurrency;

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnloadWorkflow(Config config) {
        this.settingsManager = new SettingsManager(config);
    }

    @Override // com.datastax.oss.dsbulk.workflow.api.Workflow
    public void init() throws Exception {
        this.settingsManager.init("UNLOAD", false, SchemaGenerationStrategy.READ_AND_MAP);
        this.executionId = this.settingsManager.getExecutionId();
        LogSettings logSettings = this.settingsManager.getLogSettings();
        DriverSettings driverSettings = this.settingsManager.getDriverSettings();
        ConnectorSettings connectorSettings = this.settingsManager.getConnectorSettings();
        SchemaSettings schemaSettings = this.settingsManager.getSchemaSettings();
        ExecutorSettings executorSettings = this.settingsManager.getExecutorSettings();
        CodecSettings codecSettings = this.settingsManager.getCodecSettings();
        MonitoringSettings monitoringSettings = this.settingsManager.getMonitoringSettings();
        EngineSettings engineSettings = this.settingsManager.getEngineSettings();
        engineSettings.init();
        if (engineSettings.isDryRun()) {
            throw new IllegalArgumentException("Dry-run is not supported for unload");
        }
        logSettings.init();
        connectorSettings.init(false);
        this.connector = connectorSettings.getConnector();
        this.connector.init();
        driverSettings.init(false);
        logSettings.logEffectiveSettings(this.settingsManager.getEffectiveBulkLoaderConfig(), driverSettings.getDriverConfig());
        codecSettings.init();
        monitoringSettings.init();
        executorSettings.init();
        ConvertingCodecFactory createCodecFactory = codecSettings.createCodecFactory(schemaSettings.isAllowExtraFields(), schemaSettings.isAllowMissingFields());
        this.session = driverSettings.newSession(this.executionId, createCodecFactory.getCodecRegistry(), monitoringSettings.getRegistry());
        ClusterInformationUtils.printDebugInfoAboutCluster(this.session);
        schemaSettings.init(this.session, createCodecFactory, this.connector.supports(CommonConnectorFeature.INDEXED_RECORDS), this.connector.supports(CommonConnectorFeature.MAPPED_RECORDS));
        this.logManager = logSettings.newLogManager(this.session);
        this.logManager.init();
        if (executorSettings.isTrackingBytes()) {
            monitoringSettings.forceTrackBytes();
        }
        this.metricsManager = monitoringSettings.newMetricsManager(false, false, this.logManager.getOperationDirectory(), logSettings.getVerbosity(), this.session.getContext().getProtocolVersion(), this.session.getContext().getCodecRegistry(), schemaSettings.getRowType());
        this.metricsManager.init(this.logManager.getTotalItems(), this.logManager.getTotalErrors());
        this.readResultMapper = schemaSettings.createReadResultMapper(this.session, this.connector.getRecordMetadata(), logSettings.isSources());
        this.readStatements = schemaSettings.createReadStatements(this.session);
        this.executor = executorSettings.newReadExecutor(this.session, this.metricsManager.getExecutionListener(), schemaSettings.isSearchQuery());
        this.closed.set(false);
        this.writer = this.connector.write();
        this.totalItemsMonitor = this.metricsManager.newTotalItemsMonitor();
        this.failedRecordsMonitor = this.metricsManager.newFailedRecordsMonitor();
        this.failedReadResultsMonitor = this.metricsManager.newFailedResultsMonitor();
        this.failedRecordsHandler = this.logManager.newFailedRecordsHandler();
        this.totalItemsCounter = this.logManager.newTotalItemsCounter();
        this.failedReadsHandler = this.logManager.newFailedReadsHandler();
        this.queryWarningsHandler = this.logManager.newQueryWarningsHandler();
        this.unmappableRecordsHandler = this.logManager.newUnmappableRecordsHandler();
        this.successfulRecordsHandler = this.logManager.newSuccessfulRecordsHandler();
        this.checkpointHandler = this.logManager.newRangeReadCheckpointHandler();
        this.terminationHandler = this.logManager.newTerminationHandler();
        this.numCores = Runtime.getRuntime().availableProcessors();
        if (this.connector.writeConcurrency() < 1) {
            throw new IllegalArgumentException("Invalid write concurrency: " + this.connector.writeConcurrency());
        }
        this.writeConcurrency = this.connector.writeConcurrency();
        LOGGER.debug("Using write concurrency: {}", Integer.valueOf(this.writeConcurrency));
        this.readConcurrency = Math.min(this.readStatements.size(), engineSettings.getMaxConcurrentQueries().orElse(this.numCores));
        LOGGER.debug("Using read concurrency: {} (user-supplied: {})", Integer.valueOf(this.readConcurrency), Boolean.valueOf(engineSettings.getMaxConcurrentQueries().isPresent()));
        this.schedulers = new HashSet();
    }

    @Override // com.datastax.oss.dsbulk.workflow.api.Workflow
    public boolean execute() {
        LOGGER.debug("{} started.", this);
        this.metricsManager.start();
        Flux<Void> oneWriter = this.writeConcurrency == 1 ? oneWriter() : (this.writeConcurrency < this.numCores / 2 || this.readConcurrency < this.numCores / 2) ? fewWriters() : manyWriters();
        Stopwatch createStarted = Stopwatch.createStarted();
        oneWriter.transform(this.terminationHandler).blockLast();
        createStarted.stop();
        int totalErrors = this.logManager.getTotalErrors();
        this.metricsManager.stop(createStarted.elapsed(), totalErrors == 0);
        Duration round = DurationUtils.round(createStarted.elapsed(), TimeUnit.SECONDS);
        String formatDuration = round.isZero() ? "less than one second" : DurationUtils.formatDuration(round);
        if (totalErrors == 0) {
            LOGGER.info("{} completed successfully in {}.", this, formatDuration);
        } else {
            LOGGER.warn("{} completed with {} errors in {}.", this, String.format("%,d", Integer.valueOf(totalErrors)), formatDuration);
        }
        return totalErrors == 0;
    }

    private Flux<Void> oneWriter() {
        int min = Math.min(this.numCores * 2, this.readConcurrency);
        Scheduler immediate = min == 1 ? Schedulers.immediate() : Schedulers.newParallel(min, new DefaultThreadFactory("workflow"));
        this.schedulers.add(immediate);
        return Flux.fromIterable(this.readStatements).map(rangeReadBoundStatement -> {
            return new DefaultRangeReadResource(rangeReadBoundStatement, this.executor);
        }).transform(this.checkpointHandler).flatMap(flux -> {
            Flux transform = flux.publishOn(immediate, 500).transform(this.queryWarningsHandler).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedReadResultsMonitor).transform(this.failedReadsHandler);
            ReadResultMapper readResultMapper = this.readResultMapper;
            Objects.requireNonNull(readResultMapper);
            return transform.map(readResultMapper::map).transform(this.failedRecordsMonitor).transform(this.unmappableRecordsHandler);
        }, this.readConcurrency, 500).transform(this.writer).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler).transform(this.successfulRecordsHandler);
    }

    private Flux<Void> fewWriters() {
        int min = Math.min(this.numCores, this.readConcurrency);
        Scheduler immediate = min == 1 ? Schedulers.immediate() : Schedulers.newParallel(min, new DefaultThreadFactory("workflow-read"));
        Scheduler newParallel = Schedulers.newParallel(Math.min(this.numCores, this.writeConcurrency), new DefaultThreadFactory("workflow-write"));
        this.schedulers.add(immediate);
        this.schedulers.add(newParallel);
        return Flux.fromIterable(this.readStatements).map(rangeReadBoundStatement -> {
            return new DefaultRangeReadResource(rangeReadBoundStatement, this.executor);
        }).transform(this.checkpointHandler).flatMap(flux -> {
            Flux transform = flux.publishOn(immediate, 500).transform(this.queryWarningsHandler).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedReadResultsMonitor).transform(this.failedReadsHandler);
            ReadResultMapper readResultMapper = this.readResultMapper;
            Objects.requireNonNull(readResultMapper);
            return transform.map(readResultMapper::map).transform(this.failedRecordsMonitor).transform(this.unmappableRecordsHandler);
        }, this.readConcurrency, 500).parallel(this.writeConcurrency).runOn(newParallel).groups().flatMap(groupedFlux -> {
            return groupedFlux.transform(this.writer).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler).transform(this.successfulRecordsHandler);
        }, this.writeConcurrency, 500);
    }

    private Flux<Void> manyWriters() {
        int min = Math.min(this.readConcurrency, this.writeConcurrency);
        Scheduler newParallel = Schedulers.newParallel(Math.min(this.numCores * 2, min), new DefaultThreadFactory("workflow"));
        this.schedulers.add(newParallel);
        return Flux.fromIterable(this.readStatements).map(rangeReadBoundStatement -> {
            return new DefaultRangeReadResource(rangeReadBoundStatement, this.executor);
        }).transform(this.checkpointHandler).flatMap(flux -> {
            Flux transform = flux.publishOn(newParallel, 500).transform(this.queryWarningsHandler).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedReadResultsMonitor).transform(this.failedReadsHandler);
            ReadResultMapper readResultMapper = this.readResultMapper;
            Objects.requireNonNull(readResultMapper);
            Flux transform2 = transform.map(readResultMapper::map).transform(this.failedRecordsMonitor).transform(this.unmappableRecordsHandler);
            return (min == this.writeConcurrency ? transform2.transform(this.writer) : transform2.window(500).flatMap(flux -> {
                return flux.transform(this.writer);
            }, 1, 500)).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler).transform(this.successfulRecordsHandler);
        }, min, 500);
    }

    @Override // com.datastax.oss.dsbulk.workflow.api.Workflow, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            LOGGER.debug("{} closing.", this);
            Exception closeQuietly = CloseableUtils.closeQuietly(this.connector, CloseableUtils.closeQuietly(this.logManager, CloseableUtils.closeQuietly(this.metricsManager, (Exception) null)));
            if (this.schedulers != null) {
                Iterator<Scheduler> it = this.schedulers.iterator();
                while (it.hasNext()) {
                    closeQuietly = CloseableUtils.closeQuietly(it.next(), closeQuietly);
                }
            }
            Exception closeQuietly2 = CloseableUtils.closeQuietly(this.session, CloseableUtils.closeQuietly(this.executor, closeQuietly));
            if (this.metricsManager != null) {
                this.metricsManager.reportFinalMetrics();
            }
            if (this.logManager != null) {
                this.logManager.reportAvailableFiles();
            }
            LOGGER.debug("{} closed.", this);
            if (closeQuietly2 != null) {
                throw closeQuietly2;
            }
        }
    }

    public String toString() {
        return this.executionId == null ? "Operation" : "Operation " + this.executionId;
    }
}
