/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.source;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.testframe.source.FromElementsSource;
import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FromElementsSourceReader<T>
implements SourceReader<T, FromElementsSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(FromElementsSourceReader.class);
    private volatile int emittedNum;
    private volatile boolean isRunning = true;
    private SourceReaderContext context;
    private Integer limitedNum;
    private Boundedness boundedness;
    private volatile boolean checkpointAtLimitedNum = false;
    private final FromElementsSource.ElementsSupplier<T> elementsSupplier;
    private Counter numRecordInCounter;

    public FromElementsSourceReader(Integer limitedNum, FromElementsSource.ElementsSupplier<T> elementsSupplier, Boundedness boundedness, SourceReaderContext context) {
        this.context = context;
        this.emittedNum = 0;
        this.elementsSupplier = elementsSupplier;
        this.limitedNum = limitedNum;
        this.boundedness = boundedness;
        this.numRecordInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

    public void start() {
    }

    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        if (this.isRunning && this.emittedNum < this.elementsSupplier.numElements()) {
            if (this.limitedNum == null || this.limitedNum != null && (this.emittedNum < this.limitedNum || this.checkpointAtLimitedNum)) {
                output.collect(this.elementsSupplier.get(this.emittedNum));
                ++this.emittedNum;
                this.numRecordInCounter.inc();
            }
            return InputStatus.MORE_AVAILABLE;
        }
        if (Boundedness.CONTINUOUS_UNBOUNDED.equals((Object)this.boundedness)) {
            return InputStatus.MORE_AVAILABLE;
        }
        return InputStatus.END_OF_INPUT;
    }

    public List<FromElementsSplit> snapshotState(long checkpointId) {
        if (this.limitedNum != null && !this.checkpointAtLimitedNum && this.emittedNum == this.limitedNum) {
            this.checkpointAtLimitedNum = true;
            LOG.info("checkpoint {} is the target checkpoint to be used.", (Object)checkpointId);
        }
        return Arrays.asList(new FromElementsSplit(this.emittedNum));
    }

    public CompletableFuture<Void> isAvailable() {
        return CompletableFuture.completedFuture(null);
    }

    public void addSplits(List<FromElementsSplit> splits) {
        this.emittedNum = splits.get(0).getEmitNum();
        LOG.info("FromElementsSourceReader restores from {}.", (Object)this.emittedNum);
    }

    public void notifyNoMoreSplits() {
    }

    public void close() throws Exception {
        this.isRunning = false;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.info("checkpoint {} finished.", (Object)checkpointId);
    }
}

