package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumerationResult;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumeratorPosition;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.class */
public class IcebergScanSplitPlanner {
    private static final Logger log = LoggerFactory.getLogger(IcebergScanSplitPlanner.class);

    public static IcebergEnumerationResult planStreamSplits(Table table, IcebergScanContext icebergScanContext, IcebergEnumeratorPosition icebergEnumeratorPosition) {
        table.refresh();
        return icebergEnumeratorPosition == null ? initialStreamSplits(table, icebergScanContext) : incrementalStreamSplits(table, icebergScanContext, icebergEnumeratorPosition);
    }

    private static IcebergEnumerationResult incrementalStreamSplits(Table table, IcebergScanContext icebergScanContext, IcebergEnumeratorPosition icebergEnumeratorPosition) {
        Snapshot currentSnapshot = table.currentSnapshot();
        if (currentSnapshot == null) {
            Preconditions.checkArgument(icebergEnumeratorPosition.getSnapshotId() == null, "Invalid last enumerated position for an empty table: not null");
            log.info("Skip incremental scan because table is empty");
            return new IcebergEnumerationResult(Collections.emptyList(), icebergEnumeratorPosition, icebergEnumeratorPosition);
        }
        if (icebergEnumeratorPosition.getSnapshotId() != null && currentSnapshot.snapshotId() == icebergEnumeratorPosition.getSnapshotId().longValue()) {
            log.debug("Current table snapshot is already enumerated: {}", Long.valueOf(currentSnapshot.snapshotId()));
            return new IcebergEnumerationResult(Collections.emptyList(), icebergEnumeratorPosition, icebergEnumeratorPosition);
        }
        IcebergEnumeratorPosition icebergEnumeratorPosition2 = new IcebergEnumeratorPosition(Long.valueOf(currentSnapshot.snapshotId()), Long.valueOf(currentSnapshot.timestampMillis()));
        List<IcebergFileScanTaskSplit> planSplits = planSplits(table, icebergScanContext.copyWithAppendsBetween(icebergEnumeratorPosition.getSnapshotId(), currentSnapshot.snapshotId()));
        log.info("Discovered {} splits from incremental scan: from snapshot (exclusive) is {}, to snapshot (inclusive) is {}", new Object[]{Integer.valueOf(planSplits.size()), icebergEnumeratorPosition, icebergEnumeratorPosition2});
        return new IcebergEnumerationResult(planSplits, icebergEnumeratorPosition, icebergEnumeratorPosition2);
    }

    private static IcebergEnumerationResult initialStreamSplits(Table table, IcebergScanContext icebergScanContext) {
        Optional<Snapshot> streamStartSnapshot = getStreamStartSnapshot(table, icebergScanContext);
        if (!streamStartSnapshot.isPresent()) {
            return new IcebergEnumerationResult(Collections.emptyList(), null, IcebergEnumeratorPosition.EMPTY);
        }
        Snapshot snapshot = streamStartSnapshot.get();
        List<IcebergFileScanTaskSplit> emptyList = Collections.emptyList();
        IcebergEnumeratorPosition icebergEnumeratorPosition = IcebergEnumeratorPosition.EMPTY;
        if (IcebergStreamScanStrategy.TABLE_SCAN_THEN_INCREMENTAL.equals(icebergScanContext.getStreamScanStrategy())) {
            emptyList = planSplits(table, icebergScanContext);
            log.info("Discovered {} splits from initial batch table scan with snapshot Id {}", Integer.valueOf(emptyList.size()), Long.valueOf(snapshot.snapshotId()));
            icebergEnumeratorPosition = new IcebergEnumeratorPosition(Long.valueOf(snapshot.snapshotId()), Long.valueOf(snapshot.timestampMillis()));
        } else {
            Long parentId = snapshot.parentId();
            if (parentId != null) {
                Snapshot snapshot2 = table.snapshot(parentId.longValue());
                icebergEnumeratorPosition = new IcebergEnumeratorPosition(parentId, snapshot2 != null ? Long.valueOf(snapshot2.timestampMillis()) : null);
            }
            log.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}", Long.valueOf(snapshot.snapshotId()), Long.valueOf(snapshot.timestampMillis()));
        }
        return new IcebergEnumerationResult(emptyList, null, icebergEnumeratorPosition);
    }

    private static Optional<Snapshot> getStreamStartSnapshot(Table table, IcebergScanContext icebergScanContext) {
        switch (icebergScanContext.getStreamScanStrategy()) {
            case TABLE_SCAN_THEN_INCREMENTAL:
            case FROM_LATEST_SNAPSHOT:
                return Optional.ofNullable(table.currentSnapshot());
            case FROM_EARLIEST_SNAPSHOT:
                return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
            case FROM_SNAPSHOT_ID:
                return Optional.of(table.snapshot(icebergScanContext.getStartSnapshotId().longValue()));
            case FROM_SNAPSHOT_TIMESTAMP:
                long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, icebergScanContext.getStartSnapshotTimestamp().longValue());
                Snapshot snapshot = table.snapshot(snapshotIdAsOfTime);
                return snapshot.timestampMillis() == icebergScanContext.getStartSnapshotTimestamp().longValue() ? Optional.of(snapshot) : Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
            default:
                throw new IcebergConnectorException((SeaTunnelErrorCode) CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported stream scan strategy: " + icebergScanContext.getStreamScanStrategy());
        }
    }

    public static List<IcebergFileScanTaskSplit> planSplits(Table table, IcebergScanContext icebergScanContext) {
        try {
            CloseableIterable<CombinedScanTask> planTasks = planTasks(table, icebergScanContext);
            Throwable th = null;
            try {
                try {
                    ArrayList arrayList = new ArrayList();
                    CloseableIterator<CombinedScanTask> it = planTasks.iterator();
                    while (it.hasNext()) {
                        Iterator<FileScanTask> it2 = it.next().files().iterator();
                        while (it2.hasNext()) {
                            arrayList.add(new IcebergFileScanTaskSplit(it2.next()));
                        }
                    }
                    if (planTasks != null) {
                        if (0 != 0) {
                            try {
                                planTasks.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            planTasks.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new IcebergConnectorException(IcebergConnectorErrorCode.FILE_SCAN_SPLIT_FAILED, "Failed to scan iceberg splits from: " + table.name(), e);
        }
    }

    private static CloseableIterable<CombinedScanTask> planTasks(Table table, IcebergScanContext icebergScanContext) {
        if (!icebergScanContext.isStreaming() && icebergScanContext.getStartSnapshotId() == null && icebergScanContext.getEndSnapshotId() == null) {
            TableScan tableScan = (TableScan) rebuildScanWithBaseConfig(table.newScan(), icebergScanContext);
            if (icebergScanContext.getUseSnapshotId() != null) {
                tableScan = tableScan.useSnapshot(icebergScanContext.getUseSnapshotId().longValue());
            }
            if (icebergScanContext.getUseSnapshotTimestamp() != null) {
                tableScan = tableScan.asOfTime(icebergScanContext.getUseSnapshotTimestamp().longValue());
            }
            return tableScan.planTasks();
        }
        IncrementalAppendScan incrementalAppendScan = (IncrementalAppendScan) rebuildScanWithBaseConfig(table.newIncrementalAppendScan(), icebergScanContext);
        if (icebergScanContext.getStartSnapshotId() != null) {
            incrementalAppendScan = incrementalAppendScan.fromSnapshotExclusive(icebergScanContext.getStartSnapshotId().longValue());
        }
        if (icebergScanContext.getEndSnapshotId() != null) {
            incrementalAppendScan = incrementalAppendScan.toSnapshot(icebergScanContext.getEndSnapshotId().longValue());
        }
        return incrementalAppendScan.planTasks();
    }

    private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T rebuildScanWithBaseConfig(T t, IcebergScanContext icebergScanContext) {
        Scan scan = (Scan) ((Scan) t.caseSensitive(icebergScanContext.isCaseSensitive())).project(icebergScanContext.getSchema());
        if (icebergScanContext.getFilter() != null) {
            scan = (Scan) scan.filter(icebergScanContext.getFilter());
        }
        if (icebergScanContext.getSplitSize() != null) {
            scan = (Scan) scan.option(TableProperties.SPLIT_SIZE, icebergScanContext.getSplitSize().toString());
        }
        if (icebergScanContext.getSplitLookback() != null) {
            scan = (Scan) scan.option(TableProperties.SPLIT_LOOKBACK, icebergScanContext.getSplitLookback().toString());
        }
        if (icebergScanContext.getSplitOpenFileCost() != null) {
            scan = (Scan) scan.option(TableProperties.SPLIT_OPEN_FILE_COST, icebergScanContext.getSplitOpenFileCost().toString());
        }
        return (T) scan;
    }
}
