package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.NonNull;
import org.apache.iceberg.Table;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.class */
public abstract class AbstractSplitEnumerator implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSplitEnumerator.class);
    protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context;
    protected final SourceConfig sourceConfig;
    protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;
    protected IcebergTableLoader icebergTableLoader;

    public AbstractSplitEnumerator(@NonNull SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context, @NonNull SourceConfig sourceConfig, @NonNull Map<Integer, List<IcebergFileScanTaskSplit>> map) {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
        if (map == null) {
            throw new NullPointerException("pendingSplits is marked non-null but is null");
        }
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.pendingSplits = new HashMap(map);
    }

    public void open() {
        this.icebergTableLoader = IcebergTableLoader.create(this.sourceConfig);
        this.icebergTableLoader.open();
    }

    public void run() {
        refreshPendingSplits();
        assignPendingSplits(this.context.registeredReaders());
    }

    public void close() throws IOException {
        this.icebergTableLoader.close();
    }

    public void addSplitsBack(List<IcebergFileScanTaskSplit> list, int i) {
        addPendingSplits(list);
        if (this.context.registeredReaders().contains(Integer.valueOf(i))) {
            assignPendingSplits(Collections.singleton(Integer.valueOf(i)));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void registerReader(int i) {
        log.debug("Adding reader {} to IcebergSourceEnumerator.", Integer.valueOf(i));
        assignPendingSplits(Collections.singleton(Integer.valueOf(i)));
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void refreshPendingSplits() {
        addPendingSplits(loadNewSplits(this.icebergTableLoader.loadTable()));
    }

    protected abstract List<IcebergFileScanTaskSplit> loadNewSplits(Table table);

    private void addPendingSplits(Collection<IcebergFileScanTaskSplit> collection) {
        int currentParallelism = this.context.currentParallelism();
        for (IcebergFileScanTaskSplit icebergFileScanTaskSplit : collection) {
            int hashCode = icebergFileScanTaskSplit.splitId().hashCode() % currentParallelism;
            this.pendingSplits.computeIfAbsent(Integer.valueOf(hashCode), num -> {
                return new ArrayList();
            }).add(icebergFileScanTaskSplit);
            log.info("Assigning {} to {} reader.", icebergFileScanTaskSplit, Integer.valueOf(hashCode));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignPendingSplits(Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            List<IcebergFileScanTaskSplit> remove = this.pendingSplits.remove(Integer.valueOf(intValue));
            if (remove != null && !remove.isEmpty()) {
                log.info("Assign splits {} to reader {}", remove, Integer.valueOf(intValue));
                try {
                    this.context.assignSplit(intValue, remove);
                } catch (Exception e) {
                    log.error("Failed to assign splits {} to reader {}", new Object[]{remove, Integer.valueOf(intValue), e});
                    this.pendingSplits.put(Integer.valueOf(intValue), remove);
                }
            }
        }
    }
}
