package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperator.class */
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT> implements OperatorEventHandler, PushingAsyncDataInput<OUT>, TimestampsAndWatermarks.WatermarkUpdateListener {
    private static final long serialVersionUID = 1405537676017904695L;
    static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC;
    private final FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final OperatorEventGateway operatorEventGateway;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private final WatermarkAlignmentParams watermarkAlignmentParams;
    private final Configuration configuration;
    private final String localHostname;
    private final boolean emitProgressiveWatermarks;
    private SourceReader<OUT, SplitT> sourceReader;
    private ReaderOutput<OUT> currentMainOutput;
    private PushingAsyncDataInput.DataOutput<OUT> lastInvokedOutput;
    private ListState<SplitT> readerState;
    private TimestampsAndWatermarks<OUT> eventTimeLogic;
    private OperatingMode operatingMode;
    private InternalSourceReaderMetricGroup sourceMetricGroup;

    @Nullable
    private LatencyMarkerEmitter<OUT> latencyMarkerEmitter;
    private final boolean allowUnalignedSourceSplits;
    private final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
    private transient PausableRelativeClock mainInputActivityClock;
    static final /* synthetic */ boolean $assertionsDisabled;
    private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
    private boolean idle = false;
    private final CompletableFuture<Void> finished = new CompletableFuture<>();
    private final SourceOperatorAvailabilityHelper availabilityHelper = new SourceOperatorAvailabilityHelper();
    private final List<SplitT> splitsToInitializeOutput = new ArrayList();
    private final Map<String, Long> splitCurrentWatermarks = new HashMap();
    private final Set<String> currentlyPausedSplits = new HashSet();
    private long currentMaxDesiredWatermark = Watermark.MAX_WATERMARK.getTimestamp();
    private CompletableFuture<Void> waitingForAlignmentFuture = CompletableFuture.completedFuture(null);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.operators.SourceOperator$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperator$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$core$io$InputStatus = new int[InputStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$core$io$InputStatus[InputStatus.MORE_AVAILABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$core$io$InputStatus[InputStatus.NOTHING_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$core$io$InputStatus[InputStatus.END_OF_INPUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode = new int[OperatingMode.values().length];
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[OperatingMode.WAITING_FOR_ALIGNMENT.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[OperatingMode.OUTPUT_NOT_INITIALIZED.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[OperatingMode.READING.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[OperatingMode.SOURCE_STOPPED.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[OperatingMode.SOURCE_DRAINED.ordinal()] = 5;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[OperatingMode.DATA_FINISHED.ordinal()] = 6;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperator$OperatingMode.class */
    public enum OperatingMode {
        READING,
        WAITING_FOR_ALIGNMENT,
        OUTPUT_NOT_INITIALIZED,
        SOURCE_DRAINED,
        SOURCE_STOPPED,
        DATA_FINISHED
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperator$SourceOperatorAvailabilityHelper.class */
    private static class SourceOperatorAvailabilityHelper {
        private final CompletableFuture<Void> forcedStopFuture;
        private final MultipleFuturesAvailabilityHelper availabilityHelper;

        private SourceOperatorAvailabilityHelper() {
            this.forcedStopFuture = new CompletableFuture<>();
            this.availabilityHelper = new MultipleFuturesAvailabilityHelper(2);
            this.availabilityHelper.anyOf(0, this.forcedStopFuture);
        }

        public CompletableFuture<?> update(CompletableFuture<Void> completableFuture) {
            if (completableFuture == AvailabilityProvider.AVAILABLE || completableFuture.isDone()) {
                return AvailabilityProvider.AVAILABLE;
            }
            this.availabilityHelper.resetToUnAvailable();
            this.availabilityHelper.anyOf(0, this.forcedStopFuture);
            this.availabilityHelper.anyOf(1, completableFuture);
            return this.availabilityHelper.getAvailableFuture();
        }

        public void forceStop() {
            this.forcedStopFuture.complete(null);
        }
    }

    public SourceOperator(FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> functionWithException, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, WatermarkStrategy<OUT> watermarkStrategy, ProcessingTimeService processingTimeService, Configuration configuration, String str, boolean z, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker) {
        this.readerFactory = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
        this.operatorEventGateway = (OperatorEventGateway) Preconditions.checkNotNull(operatorEventGateway);
        this.splitSerializer = (SimpleVersionedSerializer) Preconditions.checkNotNull(simpleVersionedSerializer);
        this.watermarkStrategy = (WatermarkStrategy) Preconditions.checkNotNull(watermarkStrategy);
        this.processingTimeService = processingTimeService;
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
        this.localHostname = (String) Preconditions.checkNotNull(str);
        this.emitProgressiveWatermarks = z;
        this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
        this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters();
        this.allowUnalignedSourceSplits = ((Boolean) configuration.get(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS)).booleanValue();
        this.canEmitBatchOfRecords = (StreamTask.CanEmitBatchOfRecordsChecker) Preconditions.checkNotNull(canEmitBatchOfRecordsChecker);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.SetupableStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        initSourceMetricGroup();
    }

    @VisibleForTesting
    protected void initSourceMetricGroup() {
        this.sourceMetricGroup = InternalSourceReaderMetricGroup.wrap(getMetricGroup());
    }

    public void initReader() throws Exception {
        if (this.sourceReader != null) {
            return;
        }
        final int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.sourceReader = (SourceReader) this.readerFactory.apply(new SourceReaderContext() { // from class: org.apache.flink.streaming.api.operators.SourceOperator.1
            public SourceReaderMetricGroup metricGroup() {
                return SourceOperator.this.sourceMetricGroup;
            }

            public Configuration getConfiguration() {
                return SourceOperator.this.configuration;
            }

            public String getLocalHostName() {
                return SourceOperator.this.localHostname;
            }

            public int getIndexOfSubtask() {
                return indexOfThisSubtask;
            }

            public void sendSplitRequest() {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator(new RequestSplitEvent(getLocalHostName()));
            }

            public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(sourceEvent));
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return new UserCodeClassLoader() { // from class: org.apache.flink.streaming.api.operators.SourceOperator.1.1
                    public ClassLoader asClassLoader() {
                        return SourceOperator.this.getRuntimeContext().getUserCodeClassLoader();
                    }

                    public void registerReleaseHookIfAbsent(String str, Runnable runnable) {
                        SourceOperator.this.getRuntimeContext().registerUserCodeClassLoaderReleaseHookIfAbsent(str, runnable);
                    }
                };
            }

            public int currentParallelism() {
                return SourceOperator.this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
            }
        });
    }

    public InternalSourceReaderMetricGroup getSourceMetricGroup() {
        return this.sourceMetricGroup;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        this.mainInputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock());
        TaskIOMetricGroup iOMetricGroup = getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup();
        iOMetricGroup.registerBackPressureListener(this.mainInputActivityClock);
        initReader();
        if (this.emitProgressiveWatermarks) {
            this.eventTimeLogic = TimestampsAndWatermarks.createProgressiveEventTimeLogic(this.watermarkStrategy, this.sourceMetricGroup, getProcessingTimeService(), getExecutionConfig().getAutoWatermarkInterval(), this.mainInputActivityClock, getProcessingTimeService().getClock(), iOMetricGroup);
        } else {
            this.eventTimeLogic = TimestampsAndWatermarks.createNoOpEventTimeLogic(this.watermarkStrategy, this.sourceMetricGroup, this.mainInputActivityClock);
        }
        List iterableToList = CollectionUtil.iterableToList((Iterable) this.readerState.get());
        if (!iterableToList.isEmpty()) {
            LOG.info("Restoring state for {} split(s) to reader.", Integer.valueOf(iterableToList.size()));
            this.splitsToInitializeOutput.addAll(iterableToList);
            this.sourceReader.addSplits(iterableToList);
        }
        registerReader();
        this.sourceMetricGroup.idlingStarted();
        this.sourceReader.start();
        this.eventTimeLogic.startPeriodicWatermarkEmits();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void finish() throws Exception {
        stopInternalServices();
        super.finish();
        this.finished.complete(null);
    }

    private void stopInternalServices() {
        if (this.eventTimeLogic != null) {
            this.eventTimeLogic.stopPeriodicWatermarkEmits();
        }
        if (this.latencyMarkerEmitter != null) {
            this.latencyMarkerEmitter.close();
        }
    }

    public CompletableFuture<Void> stop(StopMode stopMode) {
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[this.operatingMode.ordinal()]) {
            case 1:
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
            case 3:
                this.operatingMode = stopMode == StopMode.DRAIN ? OperatingMode.SOURCE_DRAINED : OperatingMode.SOURCE_STOPPED;
                this.availabilityHelper.forceStop();
                if (this.operatingMode == OperatingMode.SOURCE_STOPPED) {
                    stopInternalServices();
                    this.finished.complete(null);
                    return this.finished;
                }
                break;
        }
        return this.finished;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup().unregisterBackPressureListener(this.mainInputActivityClock);
        if (this.sourceReader != null) {
            this.sourceReader.close();
        }
        super.close();
    }

    @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<OUT> dataOutput) throws Exception {
        InputStatus pollNext;
        if (!$assertionsDisabled && this.lastInvokedOutput != dataOutput && this.lastInvokedOutput != null && this.operatingMode != OperatingMode.DATA_FINISHED) {
            throw new AssertionError();
        }
        if (this.operatingMode != OperatingMode.READING) {
            return emitNextNotReading(dataOutput);
        }
        do {
            pollNext = this.sourceReader.pollNext(this.currentMainOutput);
            if (pollNext != InputStatus.MORE_AVAILABLE || !this.canEmitBatchOfRecords.check()) {
                break;
            }
        } while (!shouldWaitForAlignment());
        return convertToInternalStatus(pollNext);
    }

    private DataInputStatus emitNextNotReading(PushingAsyncDataInput.DataOutput<OUT> dataOutput) throws Exception {
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[this.operatingMode.ordinal()]) {
            case 1:
                Preconditions.checkState(!this.waitingForAlignmentFuture.isDone());
                Preconditions.checkState(shouldWaitForAlignment());
                return convertToInternalStatus(InputStatus.NOTHING_AVAILABLE);
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                if (this.watermarkAlignmentParams.isEnabled()) {
                    this.processingTimeService.scheduleWithFixedDelay(j -> {
                        emitLatestWatermark();
                    }, this.watermarkAlignmentParams.getUpdateInterval(), this.watermarkAlignmentParams.getUpdateInterval());
                }
                initializeMainOutput(dataOutput);
                return convertToInternalStatus(this.sourceReader.pollNext(this.currentMainOutput));
            case 3:
            default:
                throw new IllegalStateException("Unknown operating mode: " + this.operatingMode);
            case 4:
                this.operatingMode = OperatingMode.DATA_FINISHED;
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.STOPPED;
            case 5:
                this.operatingMode = OperatingMode.DATA_FINISHED;
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.END_OF_DATA;
            case 6:
                if (this.watermarkAlignmentParams.isEnabled()) {
                    this.latestWatermark = Watermark.MAX_WATERMARK.getTimestamp();
                    emitLatestWatermark();
                }
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.END_OF_INPUT;
        }
    }

    private void initializeMainOutput(PushingAsyncDataInput.DataOutput<OUT> dataOutput) {
        this.currentMainOutput = this.eventTimeLogic.createMainOutput(dataOutput, this);
        initializeLatencyMarkerEmitter(dataOutput);
        this.lastInvokedOutput = dataOutput;
        createOutputForSplits(this.splitsToInitializeOutput);
        this.operatingMode = OperatingMode.READING;
    }

    private void initializeLatencyMarkerEmitter(PushingAsyncDataInput.DataOutput<OUT> dataOutput) {
        long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : ((Duration) getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration().get(MetricOptions.LATENCY_INTERVAL)).toMillis();
        if (latencyTrackingInterval > 0) {
            ProcessingTimeService processingTimeService = getProcessingTimeService();
            Objects.requireNonNull(dataOutput);
            this.latencyMarkerEmitter = new LatencyMarkerEmitter<>(processingTimeService, dataOutput::emitLatencyMarker, latencyTrackingInterval, getOperatorID(), getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
        }
    }

    private DataInputStatus convertToInternalStatus(InputStatus inputStatus) {
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$core$io$InputStatus[inputStatus.ordinal()]) {
            case 1:
                return DataInputStatus.MORE_AVAILABLE;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.NOTHING_AVAILABLE;
            case 3:
                this.operatingMode = OperatingMode.DATA_FINISHED;
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.END_OF_DATA;
            default:
                throw new IllegalArgumentException("Unknown input status: " + inputStatus);
        }
    }

    private void emitLatestWatermark() {
        Preconditions.checkState(this.currentMainOutput != null);
        if (this.latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
            return;
        }
        this.operatorEventGateway.sendEventToCoordinator(new ReportedWatermarkEvent(this.idle ? Watermark.MAX_WATERMARK.getTimestamp() : this.latestWatermark));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        long checkpointId = stateSnapshotContext.getCheckpointId();
        LOG.debug("Taking a snapshot for checkpoint {}", Long.valueOf(checkpointId));
        this.readerState.update(this.sourceReader.snapshotState(checkpointId));
    }

    public CompletableFuture<?> getAvailableFuture() {
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$streaming$api$operators$SourceOperator$OperatingMode[this.operatingMode.ordinal()]) {
            case 1:
                return this.availabilityHelper.update(this.waitingForAlignmentFuture);
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
            case 3:
                return this.availabilityHelper.update(this.sourceReader.isAvailable());
            case 4:
            case 5:
            case 6:
                return AvailabilityProvider.AVAILABLE;
            default:
                throw new IllegalStateException("Unknown operating mode: " + this.operatingMode);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.readerState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(SPLITS_STATE_DESC), this.splitSerializer);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        this.sourceReader.notifyCheckpointComplete(j);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointAborted(long j) throws Exception {
        super.notifyCheckpointAborted(j);
        this.sourceReader.notifyCheckpointAborted(j);
    }

    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        if (operatorEvent instanceof WatermarkAlignmentEvent) {
            updateMaxDesiredWatermark((WatermarkAlignmentEvent) operatorEvent);
            checkWatermarkAlignment();
            checkSplitWatermarkAlignment();
        } else {
            if (operatorEvent instanceof AddSplitEvent) {
                handleAddSplitsEvent((AddSplitEvent) operatorEvent);
                return;
            }
            if (operatorEvent instanceof SourceEventWrapper) {
                this.sourceReader.handleSourceEvents(((SourceEventWrapper) operatorEvent).getSourceEvent());
                return;
            }
            if (operatorEvent instanceof NoMoreSplitsEvent) {
                this.sourceReader.notifyNoMoreSplits();
            } else {
                if (!(operatorEvent instanceof IsProcessingBacklogEvent)) {
                    throw new IllegalStateException("Received unexpected operator event " + operatorEvent);
                }
                if (this.eventTimeLogic != null) {
                    this.eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());
                }
                this.output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) operatorEvent).isProcessingBacklog()).build());
            }
        }
    }

    private void handleAddSplitsEvent(AddSplitEvent<SplitT> addSplitEvent) {
        try {
            List<SplitT> splits = addSplitEvent.splits(this.splitSerializer);
            if (this.operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
                this.splitsToInitializeOutput.addAll(splits);
            } else {
                createOutputForSplits(splits);
            }
            this.sourceReader.addSplits(splits);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
        }
    }

    private void createOutputForSplits(List<SplitT> list) {
        Iterator<SplitT> it = list.iterator();
        while (it.hasNext()) {
            this.currentMainOutput.createOutputForSplit(it.next().splitId());
        }
    }

    private void updateMaxDesiredWatermark(WatermarkAlignmentEvent watermarkAlignmentEvent) {
        this.currentMaxDesiredWatermark = watermarkAlignmentEvent.getMaxWatermark();
        this.sourceMetricGroup.updateMaxDesiredWatermark(this.currentMaxDesiredWatermark);
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks.WatermarkUpdateListener
    public void updateIdle(boolean z) {
        this.idle = z;
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks.WatermarkUpdateListener
    public void updateCurrentEffectiveWatermark(long j) {
        this.latestWatermark = j;
        checkWatermarkAlignment();
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks.WatermarkUpdateListener
    public void updateCurrentSplitWatermark(String str, long j) {
        this.splitCurrentWatermarks.put(str, Long.valueOf(j));
        if (j <= this.currentMaxDesiredWatermark || this.currentlyPausedSplits.contains(str)) {
            return;
        }
        pauseOrResumeSplits(Collections.singletonList(str), Collections.emptyList());
        this.currentlyPausedSplits.add(str);
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks.WatermarkUpdateListener
    public void splitFinished(String str) {
        this.splitCurrentWatermarks.remove(str);
    }

    private void checkSplitWatermarkAlignment() {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.splitCurrentWatermarks.forEach((str, l) -> {
            if (l.longValue() > this.currentMaxDesiredWatermark) {
                arrayList.add(str);
            } else if (this.currentlyPausedSplits.contains(str)) {
                arrayList2.add(str);
            }
        });
        arrayList.removeAll(this.currentlyPausedSplits);
        if (arrayList.isEmpty() && arrayList2.isEmpty()) {
            return;
        }
        pauseOrResumeSplits(arrayList, arrayList2);
        this.currentlyPausedSplits.addAll(arrayList);
        Set<String> set = this.currentlyPausedSplits;
        Objects.requireNonNull(set);
        arrayList2.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    private void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
        try {
            this.sourceReader.pauseOrResumeSplits(collection, collection2);
            this.eventTimeLogic.pauseOrResumeSplits(collection, collection2);
        } catch (UnsupportedOperationException e) {
            if (!this.allowUnalignedSourceSplits) {
                throw e;
            }
        }
    }

    private void checkWatermarkAlignment() {
        if (this.operatingMode == OperatingMode.READING) {
            Preconditions.checkState(this.waitingForAlignmentFuture.isDone());
            if (shouldWaitForAlignment()) {
                this.operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
                this.waitingForAlignmentFuture = new CompletableFuture<>();
                this.mainInputActivityClock.pause();
                return;
            }
            return;
        }
        if (this.operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
            Preconditions.checkState(!this.waitingForAlignmentFuture.isDone());
            if (shouldWaitForAlignment()) {
                return;
            }
            this.operatingMode = OperatingMode.READING;
            this.waitingForAlignmentFuture.complete(null);
            this.mainInputActivityClock.unPause();
        }
    }

    private boolean shouldWaitForAlignment() {
        return this.currentMaxDesiredWatermark < this.latestWatermark;
    }

    private void registerReader() {
        this.operatorEventGateway.sendEventToCoordinator(new ReaderRegistrationEvent(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.localHostname));
    }

    @VisibleForTesting
    public SourceReader<OUT, SplitT> getSourceReader() {
        return this.sourceReader;
    }

    @VisibleForTesting
    ListState<SplitT> getReaderState() {
        return this.readerState;
    }

    static {
        $assertionsDisabled = !SourceOperator.class.desiredAssertionStatus();
        SPLITS_STATE_DESC = new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
    }
}
