/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
SupportsHandleExecutionAttemptSourceEvent {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFileSplitEnumerator.class);
    private final SplitEnumeratorContext<SplitT> context;
    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
    private final FileSplitAssigner.Provider splitAssignerFactory;
    private final Set<String> assignedSplits;
    private transient Set<String> allEnumeratingSplits;
    private transient FileSplitAssigner splitAssigner;

    public DynamicFileSplitEnumerator(SplitEnumeratorContext<SplitT> context, DynamicFileEnumerator.Provider fileEnumeratorFactory, FileSplitAssigner.Provider splitAssignerFactory) {
        this.context = (SplitEnumeratorContext)Preconditions.checkNotNull(context);
        this.splitAssignerFactory = (FileSplitAssigner.Provider)Preconditions.checkNotNull((Object)splitAssignerFactory);
        this.fileEnumeratorFactory = (DynamicFileEnumerator.Provider)Preconditions.checkNotNull((Object)fileEnumeratorFactory);
        this.assignedSplits = new HashSet<String>();
    }

    public void start() {
    }

    public void close() throws IOException {
    }

    public void addReader(int subtaskId) {
    }

    public void handleSplitRequest(int subtask, @Nullable String hostname) {
        Optional<FileSourceSplit> nextSplit;
        if (!this.context.registeredReaders().containsKey(subtask)) {
            return;
        }
        if (this.splitAssigner == null) {
            this.createSplitAssigner(null);
        }
        if (LOG.isDebugEnabled()) {
            Object hostInfo = hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
            LOG.debug("Subtask {} {} is requesting a file source split", (Object)subtask, hostInfo);
        }
        if ((nextSplit = this.getNextUnassignedSplit(hostname)).isPresent()) {
            FileSourceSplit split = nextSplit.get();
            this.context.assignSplit((SourceSplit)split, subtask);
            this.assignedSplits.add(split.splitId());
            LOG.debug("Assigned split to subtask {} : {}", (Object)subtask, (Object)split);
        } else {
            this.context.signalNoMoreSplits(subtask);
            LOG.info("No more splits available for subtask {}", (Object)subtask);
        }
    }

    private Optional<FileSourceSplit> getNextUnassignedSplit(String hostname) {
        Optional<FileSourceSplit> nextSplit = this.splitAssigner.getNext(hostname);
        while (nextSplit.isPresent()) {
            FileSourceSplit split = nextSplit.get();
            if (!this.assignedSplits.contains(split.splitId())) {
                return nextSplit;
            }
            nextSplit = this.splitAssigner.getNext(hostname);
        }
        return nextSplit;
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof DynamicFilteringEvent) {
            LOG.warn("Received DynamicFilteringEvent: {}", (Object)subtaskId);
            this.createSplitAssigner(((DynamicFilteringEvent)sourceEvent).getData());
        } else {
            LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
        }
    }

    private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
        Collection<FileSourceSplit> splits;
        DynamicFileEnumerator fileEnumerator = this.fileEnumeratorFactory.create();
        if (dynamicFilteringData != null) {
            fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
        }
        try {
            splits = fileEnumerator.enumerateSplits(new Path[1], this.context.currentParallelism());
            this.allEnumeratingSplits = splits.stream().map(FileSourceSplit::splitId).collect(Collectors.toSet());
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not enumerate file splits", (Throwable)e);
        }
        this.splitAssigner = this.splitAssignerFactory.create(splits);
    }

    public void addSplitsBack(List<SplitT> splits, int subtaskId) {
        LOG.debug("Dynamic File Source Enumerator adds splits back: {}", splits);
        if (this.splitAssigner != null) {
            ArrayList<FileSourceSplit> fileSplits = new ArrayList<FileSourceSplit>(splits);
            fileSplits.removeIf(s -> !this.allEnumeratingSplits.contains(s.splitId()));
            fileSplits.forEach(s -> this.assignedSplits.remove(s.splitId()));
            this.splitAssigner.addSplits(fileSplits);
        }
    }

    public PendingSplitsCheckpoint<SplitT> snapshotState(long checkpointId) {
        throw new UnsupportedOperationException("DynamicFileSplitEnumerator only supports batch execution.");
    }

    public void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent) {
        this.handleSourceEvent(subtaskId, sourceEvent);
    }
}

