package org.apache.flink.streaming.api.operators.source;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.RelativeClock;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.class */
public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
    private final TimestampAssigner<T> timestampAssigner;
    private final WatermarkGeneratorSupplier<T> watermarksFactory;
    private final TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider watermarksContextProvider;
    private final ProcessingTimeService timeService;
    private final long periodicWatermarkInterval;
    private final RelativeClock mainInputActivityClock;
    private final Clock clock;
    private final TaskIOMetricGroup taskIOMetricGroup;

    @Nullable
    private SplitLocalOutputs<T> currentPerSplitOutputs;

    @Nullable
    private StreamingReaderOutput<T> currentMainOutput;

    @Nullable
    private ScheduledFuture<?> periodicEmitHandle;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$IdlenessManager.class */
    private static class IdlenessManager {
        private final WatermarkOutput underlyingOutput;
        private final IdlenessAwareWatermarkOutput splitLocalOutput;
        private final IdlenessAwareWatermarkOutput mainOutput;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$IdlenessManager$IdlenessAwareWatermarkOutput.class */
        public class IdlenessAwareWatermarkOutput implements WatermarkOutput {
            private final WatermarkOutput underlyingOutput;
            private boolean isIdle = true;

            private IdlenessAwareWatermarkOutput(WatermarkOutput watermarkOutput) {
                this.underlyingOutput = watermarkOutput;
            }

            @Override // org.apache.flink.api.common.eventtime.WatermarkOutput
            public void emitWatermark(Watermark watermark) {
                this.underlyingOutput.emitWatermark(watermark);
                this.isIdle = false;
            }

            @Override // org.apache.flink.api.common.eventtime.WatermarkOutput
            public void markIdle() {
                this.isIdle = true;
                IdlenessManager.this.maybeMarkUnderlyingOutputAsIdle();
            }

            @Override // org.apache.flink.api.common.eventtime.WatermarkOutput
            public void markActive() {
                this.isIdle = false;
                this.underlyingOutput.markActive();
            }
        }

        IdlenessManager(WatermarkOutput watermarkOutput) {
            this.underlyingOutput = watermarkOutput;
            this.splitLocalOutput = new IdlenessAwareWatermarkOutput(watermarkOutput);
            this.mainOutput = new IdlenessAwareWatermarkOutput(watermarkOutput);
        }

        IdlenessAwareWatermarkOutput getSplitLocalOutput() {
            return this.splitLocalOutput;
        }

        IdlenessAwareWatermarkOutput getMainOutput() {
            return this.mainOutput;
        }

        void maybeMarkUnderlyingOutputAsIdle() {
            if (this.splitLocalOutput.isIdle && this.mainOutput.isIdle) {
                this.underlyingOutput.markIdle();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$SplitLocalOutputs.class */
    private static final class SplitLocalOutputs<T> {
        private final WatermarkOutputMultiplexer watermarkMultiplexer;
        private final PushingAsyncDataInput.DataOutput<T> recordOutput;
        private final TimestampAssigner<T> timestampAssigner;
        private final WatermarkGeneratorSupplier<T> watermarksFactory;
        private final TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider watermarksContextProvider;
        private final TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener;
        private final Clock clock;
        private final TaskIOMetricGroup taskIOMetricGroup;
        private final Map<String, PausableRelativeClock> inputActivityClocks = new HashMap();
        private final Map<String, SourceOutputWithWatermarks<T>> localOutputs = new LinkedHashMap();

        private SplitLocalOutputs(PushingAsyncDataInput.DataOutput<T> dataOutput, WatermarkOutput watermarkOutput, TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener, TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarkGeneratorSupplier, TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider timestampsAndWatermarksContextProvider, Clock clock, TaskIOMetricGroup taskIOMetricGroup) {
            this.recordOutput = dataOutput;
            this.timestampAssigner = timestampAssigner;
            this.watermarksFactory = watermarkGeneratorSupplier;
            this.watermarksContextProvider = timestampsAndWatermarksContextProvider;
            this.watermarkUpdateListener = watermarkUpdateListener;
            this.clock = clock;
            this.taskIOMetricGroup = taskIOMetricGroup;
            this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
        }

        SourceOutput<T> createOutputForSplit(String str) {
            SourceOutputWithWatermarks<T> sourceOutputWithWatermarks = this.localOutputs.get(str);
            if (sourceOutputWithWatermarks != null) {
                return sourceOutputWithWatermarks;
            }
            PausableRelativeClock createInputActivityClock = createInputActivityClock(str);
            this.watermarkMultiplexer.registerNewOutput(str, j -> {
                this.watermarkUpdateListener.updateCurrentSplitWatermark(str, j);
            });
            SourceOutputWithWatermarks<T> createWithSeparateOutputs = SourceOutputWithWatermarks.createWithSeparateOutputs(this.recordOutput, this.watermarkMultiplexer.getImmediateOutput(str), this.watermarkMultiplexer.getDeferredOutput(str), this.timestampAssigner, this.watermarksFactory.createWatermarkGenerator(this.watermarksContextProvider.create(createInputActivityClock)));
            this.localOutputs.put(str, createWithSeparateOutputs);
            return createWithSeparateOutputs;
        }

        private PausableRelativeClock createInputActivityClock(String str) {
            PausableRelativeClock pausableRelativeClock = new PausableRelativeClock(this.clock);
            this.inputActivityClocks.put(str, pausableRelativeClock);
            this.taskIOMetricGroup.registerBackPressureListener(pausableRelativeClock);
            return pausableRelativeClock;
        }

        void releaseOutputForSplit(String str) {
            this.watermarkUpdateListener.splitFinished(str);
            this.localOutputs.remove(str);
            this.watermarkMultiplexer.unregisterOutput(str);
            this.taskIOMetricGroup.unregisterBackPressureListener((PausableRelativeClock) Objects.requireNonNull(this.inputActivityClocks.remove(str)));
        }

        void emitPeriodicWatermark() {
            Iterator<SourceOutputWithWatermarks<T>> it = this.localOutputs.values().iterator();
            while (it.hasNext()) {
                it.next().emitPeriodicWatermark();
            }
            this.watermarkMultiplexer.onPeriodicEmit();
        }

        public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                this.inputActivityClocks.get(it.next()).pause();
            }
            Iterator<String> it2 = collection2.iterator();
            while (it2.hasNext()) {
                this.inputActivityClocks.get(it2.next()).unPause();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$StreamingReaderOutput.class */
    private static final class StreamingReaderOutput<T> extends SourceOutputWithWatermarks<T> implements ReaderOutput<T> {
        private final SplitLocalOutputs<T> splitLocalOutputs;

        StreamingReaderOutput(PushingAsyncDataInput.DataOutput<T> dataOutput, WatermarkOutput watermarkOutput, TimestampAssigner<T> timestampAssigner, WatermarkGenerator<T> watermarkGenerator, SplitLocalOutputs<T> splitLocalOutputs) {
            super(dataOutput, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
            this.splitLocalOutputs = splitLocalOutputs;
        }

        @Override // org.apache.flink.api.connector.source.ReaderOutput
        public SourceOutput<T> createOutputForSplit(String str) {
            return this.splitLocalOutputs.createOutputForSplit(str);
        }

        @Override // org.apache.flink.api.connector.source.ReaderOutput
        public void releaseOutputForSplit(String str) {
            this.splitLocalOutputs.releaseOutputForSplit(str);
        }
    }

    public ProgressiveTimestampsAndWatermarks(TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarkGeneratorSupplier, TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider timestampsAndWatermarksContextProvider, ProcessingTimeService processingTimeService, Duration duration, RelativeClock relativeClock, Clock clock, TaskIOMetricGroup taskIOMetricGroup) {
        long j;
        this.timestampAssigner = timestampAssigner;
        this.watermarksFactory = watermarkGeneratorSupplier;
        this.watermarksContextProvider = timestampsAndWatermarksContextProvider;
        this.timeService = processingTimeService;
        this.mainInputActivityClock = relativeClock;
        this.clock = clock;
        this.taskIOMetricGroup = taskIOMetricGroup;
        try {
            j = duration.toMillis();
        } catch (ArithmeticException e) {
            j = Long.MAX_VALUE;
        }
        this.periodicWatermarkInterval = j;
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> dataOutput, TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener) {
        Preconditions.checkState(this.currentMainOutput == null && this.currentPerSplitOutputs == null, "already created a main output");
        IdlenessManager idlenessManager = new IdlenessManager(new WatermarkToDataOutput(dataOutput, watermarkUpdateListener));
        WatermarkGenerator<T> createWatermarkGenerator = this.watermarksFactory.createWatermarkGenerator(this.watermarksContextProvider.create(this.mainInputActivityClock));
        this.currentPerSplitOutputs = new SplitLocalOutputs<>(dataOutput, idlenessManager.getSplitLocalOutput(), watermarkUpdateListener, this.timestampAssigner, this.watermarksFactory, this.watermarksContextProvider, this.clock, this.taskIOMetricGroup);
        this.currentMainOutput = new StreamingReaderOutput<>(dataOutput, idlenessManager.getMainOutput(), this.timestampAssigner, createWatermarkGenerator, this.currentPerSplitOutputs);
        return this.currentMainOutput;
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void startPeriodicWatermarkEmits() {
        Preconditions.checkState(this.periodicEmitHandle == null, "periodic emitter already started");
        if (this.periodicWatermarkInterval == 0) {
            return;
        }
        this.periodicEmitHandle = this.timeService.scheduleWithFixedDelay(this::emitImmediateWatermark, this.periodicWatermarkInterval, this.periodicWatermarkInterval);
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void stopPeriodicWatermarkEmits() {
        if (this.periodicEmitHandle != null) {
            this.periodicEmitHandle.cancel(false);
            this.periodicEmitHandle = null;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void emitImmediateWatermark(long j) {
        if (this.currentPerSplitOutputs != null) {
            this.currentPerSplitOutputs.emitPeriodicWatermark();
        }
        if (this.currentMainOutput != null) {
            this.currentMainOutput.emitPeriodicWatermark();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
        this.currentPerSplitOutputs.pauseOrResumeSplits(collection, collection2);
    }
}
