package io.debezium.connector.spanner.task;

import com.google.cloud.Timestamp;
import io.debezium.connector.spanner.db.model.InitialPartition;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.kafka.internal.model.PartitionState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.PartitionOffsetLagMetricEvent;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/PartitionFactory.class */
public class PartitionFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionFactory.class);
    private final PartitionOffsetProvider partitionOffsetProvider;
    private final MetricsEventPublisher metricsEventPublisher;

    public PartitionFactory(PartitionOffsetProvider partitionOffsetProvider, MetricsEventPublisher metricsEventPublisher) {
        this.partitionOffsetProvider = partitionOffsetProvider;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    public Partition initPartition(Timestamp timestamp, Timestamp timestamp2) {
        Partition build = Partition.builder().token(InitialPartition.PARTITION_TOKEN).parentTokens(Set.of()).startTimestamp(timestamp).endTimestamp(timestamp2).build();
        this.metricsEventPublisher.publishMetricEvent(PartitionOffsetLagMetricEvent.from(build.getToken(), timestamp));
        return build;
    }

    public Partition getPartition(PartitionState partitionState) {
        return Partition.builder().token(partitionState.getToken()).startTimestamp(getOffset(partitionState)).endTimestamp(partitionState.getEndTimestamp()).parentTokens(partitionState.getParents()).build();
    }

    private Timestamp getOffset(PartitionState partitionState) {
        Timestamp startTimestamp;
        Timestamp offset = this.partitionOffsetProvider.getOffset(partitionState.getToken());
        if (offset == null) {
            LOGGER.info("Previous offset not found, start time will be taken {}", Map.of(partitionState.getToken(), partitionState.getStartTimestamp()));
            startTimestamp = partitionState.getStartTimestamp();
        } else if (offset.toSqlTimestamp().before(partitionState.getStartTimestamp().toSqlTimestamp())) {
            LOGGER.warn("Incorrect offset, start time will be taken for partition {}, offsetMap {}", partitionState.getToken(), this.partitionOffsetProvider.getOffsetMap(partitionState.getToken()));
            startTimestamp = partitionState.getStartTimestamp();
        } else {
            LOGGER.info("Found previous offset {}", Map.of(partitionState.getToken(), offset.toString()));
            startTimestamp = offset;
        }
        this.metricsEventPublisher.publishMetricEvent(PartitionOffsetLagMetricEvent.from(partitionState.getToken(), startTimestamp));
        return startTimestamp;
    }
}
