package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import lombok.NonNull;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.Deserializer;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.class */
public class IcebergSourceReader implements SourceReader<SeaTunnelRow, IcebergFileScanTaskSplit> {
    private static final Logger log = LoggerFactory.getLogger(IcebergSourceReader.class);
    private static final long POLL_WAIT_MS = 1000;
    private final SourceReader.Context context;
    private final Queue<IcebergFileScanTaskSplit> pendingSplits;
    private final Deserializer deserializer;
    private final Schema tableSchema;
    private final Schema projectedSchema;
    private final SourceConfig sourceConfig;
    private IcebergTableLoader icebergTableLoader;
    private IcebergFileScanTaskSplitReader icebergFileScanTaskSplitReader;
    private IcebergFileScanTaskSplit currentReadSplit;
    private boolean noMoreSplitsAssignment;

    public IcebergSourceReader(@NonNull SourceReader.Context context, @NonNull SeaTunnelRowType seaTunnelRowType, @NonNull Schema schema, @NonNull Schema schema2, @NonNull SourceConfig sourceConfig) {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (seaTunnelRowType == null) {
            throw new NullPointerException("seaTunnelRowType is marked non-null but is null");
        }
        if (schema == null) {
            throw new NullPointerException("tableSchema is marked non-null but is null");
        }
        if (schema2 == null) {
            throw new NullPointerException("projectedSchema is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
        this.context = context;
        this.pendingSplits = new LinkedList();
        this.deserializer = new DefaultDeserializer(seaTunnelRowType, schema2);
        this.tableSchema = schema;
        this.projectedSchema = schema2;
        this.sourceConfig = sourceConfig;
    }

    public void open() {
        this.icebergTableLoader = IcebergTableLoader.create(this.sourceConfig);
        this.icebergTableLoader.open();
        this.icebergFileScanTaskSplitReader = new IcebergFileScanTaskSplitReader(this.deserializer, IcebergFileScanTaskReader.builder().fileIO(this.icebergTableLoader.loadTable().io()).tableSchema(this.tableSchema).projectedSchema(this.projectedSchema).caseSensitive(this.sourceConfig.isCaseSensitive()).reuseContainers(true).build());
    }

    public void close() throws IOException {
        if (this.icebergFileScanTaskSplitReader != null) {
            this.icebergFileScanTaskSplitReader.close();
        }
        this.icebergTableLoader.close();
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        IcebergFileScanTaskSplit poll = this.pendingSplits.poll();
        while (true) {
            IcebergFileScanTaskSplit icebergFileScanTaskSplit = poll;
            if (icebergFileScanTaskSplit == null) {
                break;
            }
            this.currentReadSplit = icebergFileScanTaskSplit;
            CloseableIterator<SeaTunnelRow> open = this.icebergFileScanTaskSplitReader.open(this.currentReadSplit);
            Throwable th = null;
            while (open.hasNext()) {
                try {
                    try {
                        collector.collect(open.next());
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (open != null) {
                        if (th != null) {
                            try {
                                open.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            open.close();
                        }
                    }
                    throw th3;
                }
            }
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    open.close();
                }
            }
            poll = this.pendingSplits.poll();
        }
        if (this.noMoreSplitsAssignment && Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
            return;
        }
        this.context.sendSplitRequest();
        if (this.pendingSplits.isEmpty()) {
            Thread.sleep(1000L);
        }
    }

    public List<IcebergFileScanTaskSplit> snapshotState(long j) {
        ArrayList arrayList = new ArrayList();
        if (!this.pendingSplits.isEmpty()) {
            arrayList.addAll(this.pendingSplits);
        }
        if (this.currentReadSplit != null) {
            arrayList.add(this.currentReadSplit);
        }
        return arrayList;
    }

    public void addSplits(List<IcebergFileScanTaskSplit> list) {
        log.info("Add {} splits to reader", Integer.valueOf(list.size()));
        this.pendingSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
