package org.apache.flink.streaming.runtime.operators.windowing;

import java.time.Duration;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.class */
public class WindowOperatorBuilder<T, K, W extends Window> {
    private static final String WINDOW_STATE_NAME = "window-contents";
    private final ExecutionConfig config;
    private final WindowAssigner<? super T, W> windowAssigner;
    private final TypeInformation<T> inputType;
    private final KeySelector<T, K> keySelector;
    private final TypeInformation<K> keyType;
    private Trigger<? super T, ? super W> trigger;
    private AsyncTrigger<? super T, ? super W> asyncTrigger;

    @Nullable
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0;

    @Nullable
    private OutputTag<T> lateDataOutputTag;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder$AsyncTriggerContextConvertor.class */
    private static class AsyncTriggerContextConvertor implements Trigger.TriggerContext {
        private final AsyncTrigger.TriggerContext asyncTriggerContext;

        private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext triggerContext) {
            this.asyncTriggerContext = triggerContext;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return this.asyncTriggerContext.getCurrentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return this.asyncTriggerContext.getMetricGroup();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return this.asyncTriggerContext.getCurrentWatermark();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            this.asyncTriggerContext.registerProcessingTimeTimer(j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            this.asyncTriggerContext.registerEventTimeTimer(j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            this.asyncTriggerContext.deleteProcessingTimeTimer(j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            this.asyncTriggerContext.deleteEventTimeTimer(j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            throw new UnsupportedOperationException("Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs.");
        }

        public static Trigger.TriggerContext of(AsyncTrigger.TriggerContext triggerContext) {
            return new AsyncTriggerContextConvertor(triggerContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder$AsyncTriggerConverter.class */
    public static class AsyncTriggerConverter {
        private AsyncTriggerConverter() {
        }

        public static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(Trigger<T, W> trigger) {
            return trigger instanceof CountTrigger ? AsyncCountTrigger.of(((CountTrigger) trigger).getMaxCount()) : trigger instanceof EventTimeTrigger ? AsyncEventTimeTrigger.create() : trigger instanceof ProcessingTimeTrigger ? AsyncProcessingTimeTrigger.create() : trigger instanceof PurgingTrigger ? AsyncPurgingTrigger.of(convertToAsync(((PurgingTrigger) trigger).getNestedTrigger())) : UserDefinedAsyncTrigger.of(trigger);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder$UserDefinedAsyncTrigger.class */
    public static class UserDefinedAsyncTrigger<T, W extends Window> extends AsyncTrigger<T, W> {
        private final Trigger<T, W> userDefinedTrigger;

        private UserDefinedAsyncTrigger(Trigger<T, W> trigger) {
            this.userDefinedTrigger = trigger;
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
        public StateFuture<TriggerResult> onElement(T t, long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
            return StateFutureUtils.completedFuture(this.userDefinedTrigger.onElement(t, j, w, AsyncTriggerContextConvertor.of(triggerContext)));
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
        public StateFuture<TriggerResult> onProcessingTime(long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
            return StateFutureUtils.completedFuture(this.userDefinedTrigger.onProcessingTime(j, w, AsyncTriggerContextConvertor.of(triggerContext)));
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
        public StateFuture<TriggerResult> onEventTime(long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
            return StateFutureUtils.completedFuture(this.userDefinedTrigger.onEventTime(j, w, AsyncTriggerContextConvertor.of(triggerContext)));
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
        public StateFuture<Void> clear(W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
            this.userDefinedTrigger.clear(w, AsyncTriggerContextConvertor.of(triggerContext));
            return StateFutureUtils.completedVoidFuture();
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
        public boolean isEndOfStreamTrigger() {
            return this.userDefinedTrigger instanceof GlobalWindows.EndOfStreamTrigger;
        }

        public static <T, W extends Window> AsyncTrigger<T, W> of(Trigger<T, W> trigger) {
            return new UserDefinedAsyncTrigger(trigger);
        }
    }

    public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig executionConfig, TypeInformation<T> typeInformation, KeySelector<T, K> keySelector, TypeInformation<K> typeInformation2) {
        this.windowAssigner = windowAssigner;
        this.config = executionConfig;
        this.inputType = typeInformation;
        this.keySelector = keySelector;
        this.keyType = typeInformation2;
        this.trigger = trigger;
    }

    public void trigger(Trigger<? super T, ? super W> trigger) {
        Preconditions.checkNotNull(trigger, "Window triggers cannot be null");
        if ((this.windowAssigner instanceof MergingWindowAssigner) && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        this.trigger = trigger;
    }

    public WindowOperatorBuilder<T, K, W> asyncTrigger(AsyncTrigger<? super T, ? super W> asyncTrigger) {
        Preconditions.checkNotNull(asyncTrigger, "AsyncTrigger cannot be null");
        if ((this.windowAssigner instanceof MergingWindowAssigner) && !asyncTrigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        this.asyncTrigger = asyncTrigger;
        return this;
    }

    public void allowedLateness(Duration duration) {
        Preconditions.checkNotNull(duration, "Allowed lateness cannot be null");
        long millis = duration.toMillis();
        Preconditions.checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
        this.allowedLateness = millis;
    }

    public void sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = outputTag;
    }

    public void evictor(Evictor<? super T, ? super W> evictor) {
        Preconditions.checkNotNull(evictor, "Evictor cannot be null");
        this.evictor = evictor;
    }

    public <R> OneInputStreamOperator<T, R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> windowFunction) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        return this.evictor != null ? buildEvictingWindowOperator(new InternalIterableWindowFunction(new ReduceApplyWindowFunction(reduceFunction, windowFunction))) : buildWindowOperator(new ReducingStateDescriptor(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueWindowFunction(windowFunction));
    }

    public <R> OneInputStreamOperator<T, R> asyncReduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> windowFunction) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        return this.evictor != null ? buildAsyncEvictingWindowOperator(new InternalIterableAsyncWindowFunction(new ReduceApplyWindowFunction(reduceFunction, windowFunction))) : buildAsyncWindowOperator(new org.apache.flink.api.common.state.v2.ReducingStateDescriptor(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueAsyncWindowFunction(windowFunction));
    }

    public <R> OneInputStreamOperator<T, R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> processWindowFunction) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(processWindowFunction, "ProcessWindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        return this.evictor != null ? buildEvictingWindowOperator(new InternalIterableProcessWindowFunction(new ReduceApplyProcessWindowFunction(reduceFunction, processWindowFunction))) : buildWindowOperator(new ReducingStateDescriptor(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueProcessWindowFunction(processWindowFunction));
    }

    public <R> OneInputStreamOperator<T, R> asyncReduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> processWindowFunction) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(processWindowFunction, "ProcessWindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        return this.evictor != null ? buildAsyncEvictingWindowOperator(new InternalIterableProcessAsyncWindowFunction(new ReduceApplyProcessWindowFunction(reduceFunction, processWindowFunction))) : buildAsyncWindowOperator(new org.apache.flink.api.common.state.v2.ReducingStateDescriptor(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueProcessAsyncWindowFunction(processWindowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> typeInformation) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        return this.evictor != null ? buildEvictingWindowOperator(new InternalIterableWindowFunction(new AggregateApplyWindowFunction(aggregateFunction, windowFunction))) : buildWindowOperator(new AggregatingStateDescriptor(WINDOW_STATE_NAME, aggregateFunction, typeInformation.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueWindowFunction(windowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> asyncAggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> typeInformation) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        return this.evictor != null ? buildAsyncEvictingWindowOperator(new InternalIterableAsyncWindowFunction(new AggregateApplyWindowFunction(aggregateFunction, windowFunction))) : buildAsyncWindowOperator(new org.apache.flink.api.common.state.v2.AggregatingStateDescriptor(WINDOW_STATE_NAME, aggregateFunction, typeInformation.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueAsyncWindowFunction(windowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> processWindowFunction, TypeInformation<ACC> typeInformation) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(processWindowFunction, "ProcessWindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        return this.evictor != null ? buildEvictingWindowOperator(new InternalAggregateProcessWindowFunction(aggregateFunction, processWindowFunction)) : buildWindowOperator(new AggregatingStateDescriptor(WINDOW_STATE_NAME, aggregateFunction, typeInformation.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueProcessWindowFunction(processWindowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> asyncAggregate(AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> processWindowFunction, TypeInformation<ACC> typeInformation) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(processWindowFunction, "ProcessWindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        return this.evictor != null ? buildAsyncEvictingWindowOperator(new InternalAggregateProcessAsyncWindowFunction(aggregateFunction, processWindowFunction)) : buildAsyncWindowOperator(new org.apache.flink.api.common.state.v2.AggregatingStateDescriptor(WINDOW_STATE_NAME, aggregateFunction, typeInformation.createSerializer(this.config.getSerializerConfig())), new InternalSingleValueProcessAsyncWindowFunction(processWindowFunction));
    }

    public <R> OneInputStreamOperator<T, R> apply(WindowFunction<T, R, K, W> windowFunction) {
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        return apply(new InternalIterableWindowFunction(windowFunction));
    }

    public <R> OneInputStreamOperator<T, R> process(ProcessWindowFunction<T, R, K, W> processWindowFunction) {
        Preconditions.checkNotNull(processWindowFunction, "ProcessWindowFunction cannot be null");
        return apply(new InternalIterableProcessWindowFunction(processWindowFunction));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R> OneInputStreamOperator<T, R> apply(InternalWindowFunction<Iterable<T>, R, K, W> internalWindowFunction) {
        return this.evictor != null ? buildEvictingWindowOperator(internalWindowFunction) : buildWindowOperator(new ListStateDescriptor(WINDOW_STATE_NAME, this.inputType.createSerializer(this.config.getSerializerConfig())), internalWindowFunction);
    }

    public <R> OneInputStreamOperator<T, R> asyncApply(WindowFunction<T, R, K, W> windowFunction) {
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        return asyncApply(new InternalIterableAsyncWindowFunction(windowFunction));
    }

    public <R> OneInputStreamOperator<T, R> asyncProcess(ProcessWindowFunction<T, R, K, W> processWindowFunction) {
        Preconditions.checkNotNull(processWindowFunction, "ProcessWindowFunction cannot be null");
        return asyncApply(new InternalIterableProcessAsyncWindowFunction(processWindowFunction));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R> OneInputStreamOperator<T, R> asyncApply(InternalAsyncWindowFunction<StateIterator<T>, R, K, W> internalAsyncWindowFunction) {
        return this.evictor != null ? buildAsyncEvictingWindowOperator(internalAsyncWindowFunction) : buildAsyncWindowOperator(new org.apache.flink.api.common.state.v2.ListStateDescriptor(WINDOW_STATE_NAME, this.inputType.createSerializer(this.config.getSerializerConfig())), internalAsyncWindowFunction);
    }

    private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, R, K, W> internalWindowFunction) {
        return new WindowOperator<>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), stateDescriptor, internalWindowFunction, this.trigger, this.allowedLateness, this.lateDataOutputTag);
    }

    private <R> WindowOperator<K, T, Iterable<T>, R, W> buildEvictingWindowOperator(InternalWindowFunction<Iterable<T>, R, K, W> internalWindowFunction) {
        return new EvictingWindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), new ListStateDescriptor(WINDOW_STATE_NAME, new StreamElementSerializer(this.inputType.createSerializer(this.config.getSerializerConfig()))), internalWindowFunction, this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
    }

    private <ACC, R> AsyncWindowOperator<K, T, ACC, R, W> buildAsyncWindowOperator(org.apache.flink.api.common.state.v2.StateDescriptor<?> stateDescriptor, InternalAsyncWindowFunction<ACC, R, K, W> internalAsyncWindowFunction) {
        return new AsyncWindowOperator<>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), stateDescriptor, internalAsyncWindowFunction, this.asyncTrigger == null ? AsyncTriggerConverter.convertToAsync(this.trigger) : this.asyncTrigger, this.allowedLateness, this.lateDataOutputTag);
    }

    private <R> AsyncWindowOperator<K, T, StateIterator<T>, R, W> buildAsyncEvictingWindowOperator(InternalAsyncWindowFunction<StateIterator<T>, R, K, W> internalAsyncWindowFunction) {
        return new AsyncEvictingWindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), new org.apache.flink.api.common.state.v2.ListStateDescriptor(WINDOW_STATE_NAME, new StreamElementSerializer(this.inputType.createSerializer(this.config.getSerializerConfig()))), internalAsyncWindowFunction, this.asyncTrigger == null ? AsyncTriggerConverter.convertToAsync(this.trigger) : this.asyncTrigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
    }

    protected static String generateFunctionName(Function function) {
        Class<?> cls = function.getClass();
        return cls.isAnonymousClass() ? cls.getInterfaces().length == 0 ? cls.getSuperclass().getSimpleName() + cls.getName().substring(cls.getEnclosingClass().getName().length()) : cls.getInterfaces()[0].getSimpleName() + cls.getName().substring(cls.getEnclosingClass().getName().length()) : cls.getSimpleName();
    }

    public String generateOperatorName() {
        return this.windowAssigner.getClass().getSimpleName();
    }

    public String generateOperatorDescription(Function function, @Nullable Function function2) {
        return "Window(" + this.windowAssigner + ", " + this.trigger.getClass().getSimpleName() + ", " + (this.evictor == null ? "" : this.evictor.getClass().getSimpleName() + ", ") + generateFunctionName(function) + (function2 == null ? "" : ", " + generateFunctionName(function2)) + ")";
    }

    @VisibleForTesting
    public long getAllowedLateness() {
        return this.allowedLateness;
    }
}
