/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.groupwindow.assigners;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.NavigableSet;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.window.MergeCallback;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.InternalTimeWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;

public class SessionWindowAssigner
extends MergingWindowAssigner<TimeWindow>
implements InternalTimeWindowAssigner {
    private static final long serialVersionUID = -2595385378968688228L;
    private final long sessionGap;
    private final boolean isEventTime;

    protected SessionWindowAssigner(long sessionGap, boolean isEventTime) {
        if (sessionGap <= 0L) {
            throw new IllegalArgumentException("SessionWindowAssigner parameters must satisfy 0 < size");
        }
        this.sessionGap = sessionGap;
        this.isEventTime = isEventTime;
    }

    @Override
    public Collection<TimeWindow> assignWindows(RowData element, long timestamp) {
        return Collections.singletonList(new TimeWindow(timestamp, timestamp + this.sessionGap));
    }

    @Override
    public void mergeWindows(TimeWindow newWindow, NavigableSet<TimeWindow> sortedWindows, MergeCallback<TimeWindow, Collection<TimeWindow>> callback) {
        TimeWindow ceiling = sortedWindows.ceiling(newWindow);
        TimeWindow floor = sortedWindows.floor(newWindow);
        HashSet<TimeWindow> mergedWindows = new HashSet<TimeWindow>();
        TimeWindow mergeResult = newWindow;
        if (ceiling != null) {
            mergeResult = this.mergeWindow(mergeResult, ceiling, mergedWindows);
        }
        if (floor != null) {
            mergeResult = this.mergeWindow(mergeResult, floor, mergedWindows);
        }
        if (!mergedWindows.isEmpty()) {
            mergedWindows.add(newWindow);
            try {
                callback.merge(mergeResult, mergedWindows);
            }
            catch (Exception e) {
                throw new IllegalStateException("Failed to merge windows in " + this, e);
            }
        }
    }

    private TimeWindow mergeWindow(TimeWindow curWindow, TimeWindow other, Collection<TimeWindow> mergedWindow) {
        if (curWindow.intersects(other)) {
            mergedWindow.add(other);
            return curWindow.cover(other);
        }
        return curWindow;
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return this.isEventTime;
    }

    @Override
    public String toString() {
        return "SessionWindow(" + this.sessionGap + ")";
    }

    public static SessionWindowAssigner withGap(Duration size) {
        return new SessionWindowAssigner(size.toMillis(), true);
    }

    @Override
    public SessionWindowAssigner withEventTime() {
        return new SessionWindowAssigner(this.sessionGap, true);
    }

    @Override
    public SessionWindowAssigner withProcessingTime() {
        return new SessionWindowAssigner(this.sessionGap, false);
    }
}

