package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.Workarounds;
import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.class */
public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>> implements ProcessingTimeCallbackCompat, BeamStoppableFunction, CheckpointListener, CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
    private final String stepName;
    private final SerializablePipelineOptions serializedOptions;
    private final boolean isConvertedBoundedSource;
    private final KvCoder<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> checkpointCoder;
    private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
    private final long idleTimeoutMs;
    private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
    private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
    private transient StreamingRuntimeContext runtimeContext;
    private transient SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> context;
    private transient LinkedHashMap<Long, List<CheckpointMarkT>> pendingCheckpoints;
    private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
    private transient ListState<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> stateForCheckpoint;
    private transient boolean maxWatermarkReached;
    private transient FlinkMetricContainer metricContainer;
    private volatile boolean isRunning = true;
    private transient boolean isRestored = false;

    public UnboundedSourceWrapper(String str, PipelineOptions pipelineOptions, UnboundedSource<OutputT, CheckpointMarkT> unboundedSource, int i) throws Exception {
        this.stepName = str;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.isConvertedBoundedSource = unboundedSource instanceof UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
        if (unboundedSource.requiresDeduping()) {
            LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", unboundedSource);
        }
        Coder checkpointMarkCoder = unboundedSource.getCheckpointMarkCoder();
        if (checkpointMarkCoder == null) {
            LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots.");
            this.checkpointCoder = null;
        } else {
            this.checkpointCoder = KvCoder.of(SerializableCoder.of(new TypeDescriptor<UnboundedSource>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.1
            }), checkpointMarkCoder);
        }
        this.splitSources = unboundedSource.split(i, pipelineOptions);
        this.idleTimeoutMs = ((FlinkPipelineOptions) pipelineOptions.as(FlinkPipelineOptions.class)).getShutdownSourcesAfterIdleMs().longValue();
    }

    public void open(Configuration configuration) throws Exception {
        FileSystems.setDefaultPipelineOptions(this.serializedOptions.get());
        this.runtimeContext = getRuntimeContext();
        this.metricContainer = new FlinkMetricContainer(this.runtimeContext);
        int indexOfThisSubtask = this.runtimeContext.getIndexOfThisSubtask();
        int numberOfParallelSubtasks = this.runtimeContext.getNumberOfParallelSubtasks();
        this.localSplitSources = new ArrayList();
        this.localReaders = new ArrayList();
        this.pendingCheckpoints = new LinkedHashMap<>();
        if (this.isRestored) {
            for (KV kv : (Iterable) this.stateForCheckpoint.get()) {
                this.localSplitSources.add((UnboundedSource) kv.getKey());
                this.localReaders.add(((UnboundedSource) kv.getKey()).createReader(this.serializedOptions.get(), (UnboundedSource.CheckpointMark) kv.getValue()));
            }
        } else {
            for (int i = 0; i < this.splitSources.size(); i++) {
                if (i % numberOfParallelSubtasks == indexOfThisSubtask) {
                    UnboundedSource<OutputT, CheckpointMarkT> unboundedSource = this.splitSources.get(i);
                    UnboundedSource.UnboundedReader<OutputT> createReader = unboundedSource.createReader(this.serializedOptions.get(), (UnboundedSource.CheckpointMark) null);
                    this.localSplitSources.add(unboundedSource);
                    this.localReaders.add(createReader);
                }
            }
        }
        LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", new Object[]{Integer.valueOf(indexOfThisSubtask + 1), Integer.valueOf(numberOfParallelSubtasks), this.localSplitSources});
    }

    public void run(SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> sourceContext) throws Exception {
        boolean invokeAdvance;
        this.context = sourceContext;
        ReaderInvocationUtil readerInvocationUtil = new ReaderInvocationUtil(this.stepName, this.serializedOptions.get(), this.metricContainer);
        setNextWatermarkTimer(this.runtimeContext);
        if (this.localReaders.isEmpty()) {
            LOG.info("Number of readers is 0 for this task executor, idle");
        } else if (this.isConvertedBoundedSource) {
            for (int i = 0; i < this.localReaders.size() && this.isRunning; i++) {
                UnboundedSource.UnboundedReader<OutputT> unboundedReader = this.localReaders.get(i);
                synchronized (sourceContext.getCheckpointLock()) {
                    if (readerInvocationUtil.invokeStart(unboundedReader)) {
                        emitElement(sourceContext, unboundedReader);
                    }
                }
                do {
                    synchronized (sourceContext.getCheckpointLock()) {
                        invokeAdvance = readerInvocationUtil.invokeAdvance(unboundedReader);
                        if (invokeAdvance) {
                            emitElement(sourceContext, unboundedReader);
                        }
                    }
                    if (invokeAdvance) {
                    }
                } while (this.isRunning);
            }
        } else {
            int size = this.localReaders.size();
            int i2 = 0;
            for (UnboundedSource.UnboundedReader<OutputT> unboundedReader2 : this.localReaders) {
                synchronized (sourceContext.getCheckpointLock()) {
                    if (readerInvocationUtil.invokeStart(unboundedReader2)) {
                        emitElement(sourceContext, unboundedReader2);
                    }
                }
            }
            boolean z = false;
            while (this.isRunning && !this.maxWatermarkReached) {
                UnboundedSource.UnboundedReader<OutputT> unboundedReader3 = this.localReaders.get(i2);
                synchronized (sourceContext.getCheckpointLock()) {
                    if (readerInvocationUtil.invokeAdvance(unboundedReader3)) {
                        emitElement(sourceContext, unboundedReader3);
                        z = true;
                    }
                }
                i2 = (i2 + 1) % size;
                if (i2 == 0 && !z) {
                    Thread.sleep(50L);
                } else if (i2 == 0) {
                    z = false;
                }
            }
        }
        sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
        finalizeSource();
    }

    private void finalizeSource() {
        long currentTimeMillis = System.currentTimeMillis();
        while (this.isRunning && System.currentTimeMillis() - currentTimeMillis < this.idleTimeoutMs) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                if (!this.isRunning) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void emitElement(SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> sourceContext, UnboundedSource.UnboundedReader<OutputT> unboundedReader) {
        Object current = unboundedReader.getCurrent();
        byte[] currentRecordId = unboundedReader.getCurrentRecordId();
        sourceContext.collect(WindowedValue.of(new ValueWithRecordId(current, currentRecordId), unboundedReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
    }

    public void close() throws Exception {
        try {
            if (this.metricContainer != null) {
                this.metricContainer.registerMetricsForPipelineResult();
            }
            super.close();
            if (this.localReaders != null) {
                Iterator<UnboundedSource.UnboundedReader<OutputT>> it = this.localReaders.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
            }
        } finally {
            Workarounds.deleteStaticCaches();
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.BeamStoppableFunction
    public void stop() {
        this.isRunning = false;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.isRunning) {
            LOG.debug("snapshotState() called on closed source");
        }
        if (this.checkpointCoder == null) {
            return;
        }
        this.stateForCheckpoint.clear();
        long checkpointId = functionSnapshotContext.getCheckpointId();
        ArrayList arrayList = new ArrayList(this.localSplitSources.size());
        for (int i = 0; i < this.localSplitSources.size(); i++) {
            UnboundedSource<OutputT, CheckpointMarkT> unboundedSource = this.localSplitSources.get(i);
            UnboundedSource.CheckpointMark checkpointMark = this.localReaders.get(i).getCheckpointMark();
            arrayList.add(checkpointMark);
            this.stateForCheckpoint.add(KV.of(unboundedSource, checkpointMark));
        }
        int size = this.pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
        if (size >= 0) {
            Iterator<Long> it = this.pendingCheckpoints.keySet().iterator();
            while (size >= 0) {
                it.next();
                it.remove();
                size--;
            }
        }
        this.pendingCheckpoints.put(Long.valueOf(checkpointId), arrayList);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.checkpointCoder == null) {
            return;
        }
        this.stateForCheckpoint = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("_default_", new CoderTypeInformation((Coder) this.checkpointCoder, this.serializedOptions).createSerializer(new ExecutionConfig())));
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No restore state for UnboundedSourceWrapper.");
        } else {
            this.isRestored = true;
            LOG.info("Restoring state in the UnboundedSourceWrapper.");
        }
    }

    public void onProcessingTime(long j) {
        if (this.isRunning) {
            synchronized (this.context.getCheckpointLock()) {
                long j2 = Long.MAX_VALUE;
                Iterator<UnboundedSource.UnboundedReader<OutputT>> it = this.localReaders.iterator();
                while (it.hasNext()) {
                    Instant watermark = it.next().getWatermark();
                    if (watermark != null) {
                        j2 = Math.min(watermark.getMillis(), j2);
                    }
                }
                this.context.emitWatermark(new Watermark(j2));
                if (j2 < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                    setNextWatermarkTimer(this.runtimeContext);
                } else {
                    this.maxWatermarkReached = true;
                }
            }
        }
    }

    private void setNextWatermarkTimer(StreamingRuntimeContext streamingRuntimeContext) {
        if (this.isRunning) {
            long autoWatermarkInterval = streamingRuntimeContext.getExecutionConfig().getAutoWatermarkInterval();
            synchronized (this.context.getCheckpointLock()) {
                long currentProcessingTime = streamingRuntimeContext.getProcessingTimeService().getCurrentProcessingTime();
                if (currentProcessingTime < Long.MAX_VALUE) {
                    long j = currentProcessingTime + autoWatermarkInterval;
                    if (j < currentProcessingTime) {
                        j = Long.MAX_VALUE;
                    }
                    streamingRuntimeContext.getProcessingTimeService().registerTimer(j, this);
                }
            }
        }
    }

    @VisibleForTesting
    public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
        return this.splitSources;
    }

    @VisibleForTesting
    List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
        return this.localSplitSources;
    }

    @VisibleForTesting
    List<UnboundedSource.UnboundedReader<OutputT>> getLocalReaders() {
        return this.localReaders;
    }

    @VisibleForTesting
    boolean isRunning() {
        return this.isRunning;
    }

    @VisibleForTesting
    public void setSourceContext(SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> sourceContext) {
        this.context = sourceContext;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        long longValue;
        List<CheckpointMarkT> list = this.pendingCheckpoints.get(Long.valueOf(j));
        if (list != null) {
            Iterator<Long> it = this.pendingCheckpoints.keySet().iterator();
            do {
                longValue = it.next().longValue();
                it.remove();
            } while (longValue != j);
            Iterator<CheckpointMarkT> it2 = list.iterator();
            while (it2.hasNext()) {
                it2.next().finalizeCheckpoint();
            }
        }
    }
}
