package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.class */
public class LazyFlinkSourceSplitEnumerator<T> implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
    private static final Logger LOG = LoggerFactory.getLogger(LazyFlinkSourceSplitEnumerator.class);
    private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;
    private final Source<T> beamSource;
    private final PipelineOptions pipelineOptions;
    private final int numSplits;
    private final List<FlinkSourceSplit<T>> pendingSplits;
    private boolean splitsInitialized;

    public LazyFlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> splitEnumeratorContext, Source<T> source, PipelineOptions pipelineOptions, int i, boolean z) {
        this.context = splitEnumeratorContext;
        this.beamSource = source;
        this.pipelineOptions = pipelineOptions;
        this.numSplits = i;
        this.pendingSplits = new ArrayList(i);
        this.splitsInitialized = z;
    }

    public void start() {
        if (this.splitsInitialized) {
            return;
        }
        initializeSplits();
    }

    public void initializeSplits() {
        this.context.callAsync(() -> {
            try {
                LOG.info("Starting source {}", this.beamSource);
                int i = 0;
                Iterator<? extends Source<T>> it = splitBeamSource().iterator();
                while (it.hasNext()) {
                    this.pendingSplits.add(new FlinkSourceSplit<>(i, it.next()));
                    i++;
                }
                return this.pendingSplits;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, (list, th) -> {
            if (th != null) {
                this.pendingSplits.addAll(list);
                throw new RuntimeException("Failed to start source enumerator.", th);
            }
        });
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Subtask {} {} is requesting a file source split", Integer.valueOf(i), str == null ? "(no host locality info)" : "(on host '" + str + "')");
            }
            if (this.pendingSplits.isEmpty()) {
                this.context.signalNoMoreSplits(i);
                LOG.info("No more splits available for subtask {}", Integer.valueOf(i));
            } else {
                FlinkSourceSplit<T> remove = this.pendingSplits.remove(this.pendingSplits.size() - 1);
                this.context.assignSplit(remove, i);
                LOG.info("Assigned split to subtask {} : {}", Integer.valueOf(i), remove);
            }
        }
    }

    public void addSplitsBack(List<FlinkSourceSplit<T>> list, int i) {
        LOG.info("Adding splits {} back from subtask {}", list, Integer.valueOf(i));
        this.pendingSplits.addAll(list);
    }

    public void addReader(int i) {
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public Map<Integer, List<FlinkSourceSplit<T>>> m58snapshotState(long j) throws Exception {
        LOG.info("Taking snapshot for checkpoint {}", Long.valueOf(j));
        return snapshotState();
    }

    public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState() throws Exception {
        HashMap hashMap = new HashMap(1);
        hashMap.put(1, this.pendingSplits);
        return hashMap;
    }

    public void close() throws IOException {
    }

    private long getDesiredSizeBytes(int i, BoundedSource<T> boundedSource) throws Exception {
        long estimatedSizeBytes = boundedSource.getEstimatedSizeBytes(this.pipelineOptions) / i;
        long j = 0;
        if (this.pipelineOptions != null) {
            j = ((FlinkPipelineOptions) this.pipelineOptions.as(FlinkPipelineOptions.class)).getFileInputSplitMaxSizeMB().longValue();
        }
        return (!(this.beamSource instanceof FileBasedSource) || j <= 0) ? estimatedSizeBytes : Math.min(estimatedSizeBytes, j * 1024 * 1024);
    }

    private List<? extends Source<T>> splitBeamSource() throws Exception {
        if (this.beamSource instanceof BoundedSource) {
            List<? extends Source<T>> split = this.beamSource.split(getDesiredSizeBytes(this.numSplits, (BoundedSource) this.beamSource), this.pipelineOptions);
            LOG.info("Split bounded source {} in {} splits", this.beamSource, Integer.valueOf(split.size()));
            return split;
        }
        if (!(this.beamSource instanceof UnboundedSource)) {
            throw new IllegalStateException("Unknown source type " + this.beamSource.getClass());
        }
        List<? extends Source<T>> split2 = this.beamSource.split(this.numSplits, this.pipelineOptions);
        LOG.info("Split source {} to {} splits", this.beamSource, split2);
        return split2;
    }
}
