package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.class */
public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T, WindowedValue<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class);

    @Nullable
    private Source.Reader<T> currentReader;
    private int currentSplitId;

    public FlinkBoundedSourceReader(String str, SourceReaderContext sourceReaderContext, PipelineOptions pipelineOptions, @Nullable Function<WindowedValue<T>, Long> function) {
        super(str, sourceReaderContext, pipelineOptions, function);
        this.currentSplitId = -1;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected FlinkSourceSplit<T> getReaderCheckpoint(int i, FlinkSourceReaderBase<T, WindowedValue<T>>.ReaderAndOutput readerAndOutput) throws CoderException {
        return new FlinkSourceSplit<>(i, readerAndOutput.reader.getCurrentSource());
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> flinkSourceSplit) throws IOException {
        return flinkSourceSplit.getBeamSplitSource().createReader(this.pipelineOptions);
    }

    @VisibleForTesting
    protected FlinkBoundedSourceReader(String str, SourceReaderContext sourceReaderContext, PipelineOptions pipelineOptions, ScheduledExecutorService scheduledExecutorService, @Nullable Function<WindowedValue<T>, Long> function) {
        super(str, scheduledExecutorService, sourceReaderContext, pipelineOptions, function);
        this.currentSplitId = -1;
    }

    public InputStatus pollNext(ReaderOutput<WindowedValue<T>> readerOutput) throws Exception {
        checkExceptionAndMaybeThrow();
        if (this.currentReader == null && this.currentSplitId == -1) {
            this.context.sendSplitRequest();
        }
        if (this.currentReader == null && !moveToNextNonEmptyReader()) {
            if (noMoreSplits()) {
                readerOutput.emitWatermark(Watermark.MAX_WATERMARK);
                if (checkIdleTimeoutAndMaybeStartCountdown()) {
                    LOG.info("All splits have finished reading, and idle time {} ms has passed.", Long.valueOf(this.idleTimeoutMs));
                    return InputStatus.END_OF_INPUT;
                }
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
        if (this.currentReader == null) {
            throw new IllegalArgumentException("If we reach here, the current beam reader should not be null");
        }
        Source.Reader<T> reader = this.currentReader;
        WindowedValue of = WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
        if (this.timestampExtractor == null) {
            readerOutput.collect(of);
        } else {
            readerOutput.collect(of, this.timestampExtractor.apply(of).longValue());
        }
        this.numRecordsInCounter.inc();
        if (!this.invocationUtil.invokeAdvance(reader)) {
            finishSplit(this.currentSplitId);
            LOG.debug("Finished reading from {}", Integer.valueOf(this.currentSplitId));
            this.currentReader = null;
            this.currentSplitId = -1;
            this.context.sendSplitRequest();
        }
        return InputStatus.MORE_AVAILABLE;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected CompletableFuture<Void> isAvailableForAliveReaders() {
        return AVAILABLE_NOW;
    }

    private boolean moveToNextNonEmptyReader() throws IOException {
        while (true) {
            Optional<FlinkSourceReaderBase<T, WindowedValue<T>>.ReaderAndOutput> createAndTrackNextReader = createAndTrackNextReader();
            if (!createAndTrackNextReader.isPresent()) {
                return false;
            }
            FlinkSourceReaderBase<T, WindowedValue<T>>.ReaderAndOutput readerAndOutput = createAndTrackNextReader.get();
            if (this.invocationUtil.invokeStart(readerAndOutput.reader)) {
                this.currentSplitId = Integer.parseInt(readerAndOutput.splitId);
                this.currentReader = readerAndOutput.reader;
                return true;
            }
            finishSplit(Integer.parseInt(readerAndOutput.splitId));
        }
    }
}
