package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.class */
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(SplitFetcherManager.class);
    private final Consumer<Throwable> errorHandler;
    private final AtomicInteger fetcherIdGenerator;
    private final Supplier<SplitReader<E, SplitT>> splitReaderFactory;
    private final AtomicReference<Throwable> uncaughtFetcherException;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
    private final ExecutorService executors;
    private volatile boolean closed;
    private final Consumer<Collection<String>> splitFinishedHook;
    private final boolean allowUnalignedSourceSplits;

    public SplitFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, Supplier<SplitReader<E, SplitT>> supplier, Configuration configuration) {
        this(futureCompletingBlockingQueue, supplier, configuration, collection -> {
        });
    }

    @VisibleForTesting
    public SplitFetcherManager(final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, Supplier<SplitReader<E, SplitT>> supplier, Configuration configuration, Consumer<Collection<String>> consumer) {
        this.elementsQueue = futureCompletingBlockingQueue;
        this.errorHandler = new Consumer<Throwable>() { // from class: org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.1
            @Override // java.util.function.Consumer
            public void accept(Throwable th) {
                SplitFetcherManager.LOG.error("Received uncaught exception.", th);
                if (!SplitFetcherManager.this.uncaughtFetcherException.compareAndSet(null, th)) {
                    ((Throwable) SplitFetcherManager.this.uncaughtFetcherException.get()).addSuppressed(th);
                }
                futureCompletingBlockingQueue.notifyAvailable();
            }
        };
        this.splitReaderFactory = supplier;
        this.splitFinishedHook = consumer;
        this.uncaughtFetcherException = new AtomicReference<>(null);
        this.fetcherIdGenerator = new AtomicInteger(0);
        this.fetchers = new ConcurrentHashMap();
        this.allowUnalignedSourceSplits = ((Boolean) configuration.get(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS)).booleanValue();
        String name = Thread.currentThread().getName();
        this.executors = Executors.newCachedThreadPool(runnable -> {
            return new Thread(runnable, "Source Data Fetcher for " + name);
        });
        this.closed = false;
    }

    public abstract void addSplits(List<SplitT> list);

    public abstract void removeSplits(List<SplitT> list);

    public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
        for (SplitFetcher<E, SplitT> splitFetcher : this.fetchers.values()) {
            Map<String, SplitT> assignedSplits = splitFetcher.assignedSplits();
            List<SplitT> lookupInAssignment = lookupInAssignment(collection, assignedSplits);
            List<SplitT> lookupInAssignment2 = lookupInAssignment(collection2, assignedSplits);
            if (!lookupInAssignment.isEmpty() || !lookupInAssignment2.isEmpty()) {
                splitFetcher.pauseOrResumeSplits(lookupInAssignment, lookupInAssignment2);
            }
        }
    }

    private List<SplitT> lookupInAssignment(Collection<String> collection, Map<String, SplitT> map) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            SplitT splitt = map.get(it.next());
            if (splitt != null) {
                arrayList.add(splitt);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startFetcher(SplitFetcher<E, SplitT> splitFetcher) {
        this.executors.submit(splitFetcher);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
        if (this.closed) {
            throw new IllegalStateException("The split fetcher manager has closed.");
        }
        SplitReader<E, SplitT> splitReader = this.splitReaderFactory.get();
        int andIncrement = this.fetcherIdGenerator.getAndIncrement();
        SplitFetcher<E, SplitT> splitFetcher = new SplitFetcher<>(andIncrement, this.elementsQueue, splitReader, this.errorHandler, () -> {
            this.fetchers.remove(Integer.valueOf(andIncrement));
            this.elementsQueue.notifyAvailable();
        }, this.splitFinishedHook, this.allowUnalignedSourceSplits);
        this.fetchers.put(Integer.valueOf(andIncrement), splitFetcher);
        return splitFetcher;
    }

    public boolean maybeShutdownFinishedFetchers() {
        Iterator<Map.Entry<Integer, SplitFetcher<E, SplitT>>> it = this.fetchers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SplitFetcher<E, SplitT>> next = it.next();
            SplitFetcher<E, SplitT> value = next.getValue();
            if (value.isIdle()) {
                LOG.info("Closing splitFetcher {} because it is idle.", next.getKey());
                value.shutdown();
                it.remove();
            }
        }
        return this.fetchers.isEmpty();
    }

    public synchronized void close(long j) throws Exception {
        this.closed = true;
        this.fetchers.values().forEach((v0) -> {
            v0.shutdown();
        });
        this.executors.shutdown();
        if (this.executors.awaitTermination(j, TimeUnit.MILLISECONDS)) {
            return;
        }
        LOG.warn("Failed to close the source reader in {} ms. There are still {} split fetchers running", Long.valueOf(j), Integer.valueOf(this.fetchers.size()));
    }

    public void checkErrors() {
        if (this.uncaughtFetcherException.get() != null) {
            throw new RuntimeException("One or more fetchers have encountered exception", this.uncaughtFetcherException.get());
        }
    }

    @VisibleForTesting
    public int getNumAliveFetchers() {
        return this.fetchers.size();
    }
}
