package org.apache.flink.streaming.runtime.watermark.extension.eventtime;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.watermark.BoolWatermark;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.class */
public class EventTimeWatermarkHandler {
    private final int numOfInput;
    private final Output<?> output;
    private final List<EventTimeWithIdleStatus> eventTimePerInput;

    @Nullable
    private final InternalTimeServiceManager<?> timeServiceManager;
    private long lastEmitWatermark = Long.MIN_VALUE;
    private boolean lastEmitIdleStatus = false;
    private final BitSet hasReceiveWatermarks;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler$EventTimeUpdateStatus.class */
    public static class EventTimeUpdateStatus {
        public static final EventTimeUpdateStatus NO_UPDATE = new EventTimeUpdateStatus(false, -1);
        private final boolean isEventTimeUpdated;
        private final long newEventTime;

        private EventTimeUpdateStatus(boolean z, long j) {
            this.isEventTimeUpdated = z;
            this.newEventTime = j;
        }

        public boolean isEventTimeUpdated() {
            return this.isEventTimeUpdated;
        }

        public long getNewEventTime() {
            return this.newEventTime;
        }

        public static EventTimeUpdateStatus ofUpdatedWatermark(long j) {
            return new EventTimeUpdateStatus(true, j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler$EventTimeWithIdleStatus.class */
    public static class EventTimeWithIdleStatus {
        private long eventTime = Long.MIN_VALUE;
        private boolean isIdle = false;

        EventTimeWithIdleStatus() {
        }

        public long getEventTime() {
            return this.eventTime;
        }

        public void setEventTime(long j) {
            this.eventTime = Math.max(this.eventTime, j);
        }

        public boolean isIdle() {
            return this.isIdle;
        }

        public void setIdleStatus(boolean z) {
            this.isIdle = z;
        }
    }

    public EventTimeWatermarkHandler(int i, Output<?> output, @Nullable InternalTimeServiceManager<?> internalTimeServiceManager) {
        Preconditions.checkArgument(i >= 1 && i <= 2, "numOfInput should between 1 and 2");
        this.numOfInput = i;
        this.output = output;
        this.eventTimePerInput = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            this.eventTimePerInput.add(new EventTimeWithIdleStatus());
        }
        this.timeServiceManager = internalTimeServiceManager;
        this.hasReceiveWatermarks = new BitSet(i);
    }

    private EventTimeUpdateStatus processEventTime(long j, int i) throws Exception {
        Preconditions.checkState(i < this.numOfInput);
        this.hasReceiveWatermarks.set(i);
        this.eventTimePerInput.get(i).setEventTime(j);
        this.eventTimePerInput.get(i).setIdleStatus(false);
        return tryAdvanceEventTimeAndEmitWatermark();
    }

    private EventTimeUpdateStatus tryAdvanceEventTimeAndEmitWatermark() throws Exception {
        long currentEventTime = getCurrentEventTime();
        if (currentEventTime <= this.lastEmitWatermark || this.hasReceiveWatermarks.cardinality() != this.numOfInput) {
            return EventTimeUpdateStatus.NO_UPDATE;
        }
        this.output.emitWatermark(new WatermarkEvent(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(currentEventTime), false));
        this.lastEmitWatermark = currentEventTime;
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(new Watermark(currentEventTime));
        }
        return EventTimeUpdateStatus.ofUpdatedWatermark(this.lastEmitWatermark);
    }

    private void processEventTimeIdleStatus(boolean z, int i) {
        Preconditions.checkState(i < this.numOfInput);
        this.hasReceiveWatermarks.set(i);
        this.eventTimePerInput.get(i).setIdleStatus(z);
        tryEmitEventTimeIdleStatus();
    }

    private void tryEmitEventTimeIdleStatus() {
        boolean isAllInputIdle = isAllInputIdle();
        if (isAllInputIdle != this.lastEmitIdleStatus) {
            this.output.emitWatermark(new WatermarkEvent(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(isAllInputIdle), false));
            this.lastEmitIdleStatus = isAllInputIdle;
        }
    }

    private long getCurrentEventTime() {
        long j = Long.MAX_VALUE;
        for (EventTimeWithIdleStatus eventTimeWithIdleStatus : this.eventTimePerInput) {
            if (!eventTimeWithIdleStatus.isIdle()) {
                j = Math.min(j, eventTimeWithIdleStatus.getEventTime());
            }
        }
        return j;
    }

    private boolean isAllInputIdle() {
        boolean z = true;
        Iterator<EventTimeWithIdleStatus> it = this.eventTimePerInput.iterator();
        while (it.hasNext()) {
            z &= it.next().isIdle();
        }
        return z;
    }

    public long getLastEmitWatermark() {
        return this.lastEmitWatermark;
    }

    public EventTimeUpdateStatus processWatermark(org.apache.flink.api.common.watermark.Watermark watermark, int i) throws Exception {
        if (EventTimeExtension.isEventTimeWatermark(watermark.getIdentifier())) {
            return processEventTime(((LongWatermark) watermark).getValue(), i);
        }
        if (EventTimeExtension.isIdleStatusWatermark(watermark.getIdentifier())) {
            processEventTimeIdleStatus(((BoolWatermark) watermark).getValue(), i);
        }
        return EventTimeUpdateStatus.NO_UPDATE;
    }
}
