package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import java.util.Objects;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.class */
public class QueryChangeStreamAction {
    private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
    private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
    private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40);
    private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";
    private final ChangeStreamDao changeStreamDao;
    private final PartitionMetadataDao partitionMetadataDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final PartitionMetadataMapper partitionMetadataMapper;
    private final DataChangeRecordAction dataChangeRecordAction;
    private final HeartbeatRecordAction heartbeatRecordAction;
    private final ChildPartitionsRecordAction childPartitionsRecordAction;
    private final ChangeStreamMetrics metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryChangeStreamAction(ChangeStreamDao changeStreamDao, PartitionMetadataDao partitionMetadataDao, ChangeStreamRecordMapper changeStreamRecordMapper, PartitionMetadataMapper partitionMetadataMapper, DataChangeRecordAction dataChangeRecordAction, HeartbeatRecordAction heartbeatRecordAction, ChildPartitionsRecordAction childPartitionsRecordAction, ChangeStreamMetrics changeStreamMetrics) {
        this.changeStreamDao = changeStreamDao;
        this.partitionMetadataDao = partitionMetadataDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.partitionMetadataMapper = partitionMetadataMapper;
        this.dataChangeRecordAction = dataChangeRecordAction;
        this.heartbeatRecordAction = heartbeatRecordAction;
        this.childPartitionsRecordAction = childPartitionsRecordAction;
        this.metrics = changeStreamMetrics;
    }

    @VisibleForTesting
    public DoFn.ProcessContinuation run(PartitionMetadata partitionMetadata, RestrictionTracker<TimestampRange, Timestamp> restrictionTracker, DoFn.OutputReceiver<DataChangeRecord> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, DoFn.BundleFinalizer bundleFinalizer) {
        Optional<DoFn.ProcessContinuation> run;
        String partitionToken = partitionMetadata.getPartitionToken();
        Timestamp from = ((TimestampRange) restrictionTracker.currentRestriction()).getFrom();
        Timestamp endTimestamp = partitionMetadata.getEndTimestamp();
        Optional ofNullable = Optional.ofNullable(this.partitionMetadataDao.getPartition(partitionToken));
        PartitionMetadataMapper partitionMetadataMapper = this.partitionMetadataMapper;
        Objects.requireNonNull(partitionMetadataMapper);
        PartitionMetadata partitionMetadata2 = (PartitionMetadata) ofNullable.map(partitionMetadataMapper::from).orElseThrow(() -> {
            return new IllegalStateException("Partition " + partitionToken + " not found in metadata table");
        });
        RestrictionInterrupter<Timestamp> withSoftTimeout = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
        try {
            ChangeStreamResultSet changeStreamQuery = this.changeStreamDao.changeStreamQuery(partitionToken, from, endTimestamp, partitionMetadata.getHeartbeatMillis());
            try {
                this.metrics.incQueryCounter();
                while (changeStreamQuery.next()) {
                    for (ChangeStreamRecord changeStreamRecord : this.changeStreamRecordMapper.toChangeStreamRecords(partitionMetadata2, changeStreamQuery, changeStreamQuery.getMetadata())) {
                        if (changeStreamRecord instanceof DataChangeRecord) {
                            run = this.dataChangeRecordAction.run(partitionMetadata2, (DataChangeRecord) changeStreamRecord, restrictionTracker, withSoftTimeout, outputReceiver, manualWatermarkEstimator);
                        } else if (changeStreamRecord instanceof HeartbeatRecord) {
                            run = this.heartbeatRecordAction.run(partitionMetadata2, (HeartbeatRecord) changeStreamRecord, restrictionTracker, withSoftTimeout, manualWatermarkEstimator);
                        } else {
                            if (!(changeStreamRecord instanceof ChildPartitionsRecord)) {
                                LOG.error("[{}] Unknown record type {}", partitionToken, changeStreamRecord.getClass());
                                throw new IllegalArgumentException("Unknown record type " + changeStreamRecord.getClass());
                            }
                            run = this.childPartitionsRecordAction.run(partitionMetadata2, (ChildPartitionsRecord) changeStreamRecord, restrictionTracker, withSoftTimeout, manualWatermarkEstimator);
                        }
                        if (run.isPresent()) {
                            LOG.debug("[{}] Continuation present, returning {}", partitionToken, run);
                            bundleFinalizer.afterBundleCommit(Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(partitionToken, manualWatermarkEstimator));
                            DoFn.ProcessContinuation processContinuation = run.get();
                            if (changeStreamQuery != null) {
                                changeStreamQuery.close();
                            }
                            return processContinuation;
                        }
                    }
                }
                bundleFinalizer.afterBundleCommit(Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(partitionToken, manualWatermarkEstimator));
                if (changeStreamQuery != null) {
                    changeStreamQuery.close();
                }
            } catch (Throwable th) {
                if (changeStreamQuery != null) {
                    try {
                        changeStreamQuery.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e) {
            LOG.error("[{}] query change stream had exception processing range {} to {}.", new Object[]{partitionToken, from, endTimestamp, e});
            throw e;
        } catch (SpannerException e2) {
            if (!isTimestampOutOfRange(e2)) {
                throw e2;
            }
            LOG.info("[{}] query change stream is out of range for {} to {}, finishing stream.", new Object[]{partitionToken, from, endTimestamp, e2});
        }
        LOG.debug("[{}] change stream completed successfully", partitionToken);
        if (restrictionTracker.tryClaim(endTimestamp)) {
            LOG.debug("[{}] Finishing partition", partitionToken);
            this.partitionMetadataDao.updateToFinished(partitionToken);
            this.metrics.decActivePartitionReadCounter();
            LOG.info("[{}] After attempting to finish the partition", partitionToken);
        }
        return DoFn.ProcessContinuation.stop();
    }

    private DoFn.BundleFinalizer.Callback updateWatermarkCallback(String str, WatermarkEstimator<Instant> watermarkEstimator) {
        return () -> {
            Instant currentWatermark = watermarkEstimator.currentWatermark();
            LOG.debug("[{}] Updating current watermark to {}", str, currentWatermark);
            try {
                this.partitionMetadataDao.updateWatermark(str, Timestamp.ofTimeMicroseconds(currentWatermark.getMillis() * 1000));
            } catch (SpannerException e) {
                if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
                    LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", str);
                } else {
                    LOG.error("[{}] Error updating the current watermark: {}", new Object[]{str, e.getMessage(), e});
                }
            }
        };
    }

    private boolean isTimestampOutOfRange(SpannerException spannerException) {
        return (spannerException.getErrorCode() == ErrorCode.INVALID_ARGUMENT || spannerException.getErrorCode() == ErrorCode.OUT_OF_RANGE) && spannerException.getMessage() != null && spannerException.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
    }
}
