package org.apache.flink.runtime.asyncprocessing.operators.windowing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.runtime.state.v2.adaptor.CompleteStateIterator;
import org.apache.flink.runtime.state.v2.internal.InternalListState;
import org.apache.flink.shaded.guava32.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/windowing/AsyncEvictingWindowOperator.class */
public class AsyncEvictingWindowOperator<K, IN, OUT, W extends Window> extends AsyncWindowOperator<K, IN, StateIterator<IN>, OUT, W> {
    private static final long serialVersionUID = 1;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<StreamRecord<IN>> evictingWindowStateDescriptor;
    private transient AsyncEvictingWindowOperator<K, IN, OUT, W>.EvictorContext evictorContext;
    private transient InternalListState<K, W, StreamRecord<IN>> evictingWindowState;

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/windowing/AsyncEvictingWindowOperator$EvictorContext.class */
    class EvictorContext implements Evictor.EvictorContext {
        protected DeclaredVariable<W> window;

        public EvictorContext(DeclaredVariable<W> declaredVariable) {
            this.window = declaredVariable;
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public long getCurrentProcessingTime() {
            return AsyncEvictingWindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public long getCurrentWatermark() {
            return AsyncEvictingWindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public MetricGroup getMetricGroup() {
            return AsyncEvictingWindowOperator.this.getMetricGroup();
        }

        void evictBefore(Iterable<TimestampedValue<IN>> iterable, int i) {
            AsyncEvictingWindowOperator.this.evictor.evictBefore(iterable, i, this.window.get(), this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> iterable, int i) {
            AsyncEvictingWindowOperator.this.evictor.evictAfter(iterable, i, this.window.get(), this);
        }
    }

    public AsyncEvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<StreamRecord<IN>> stateDescriptor, InternalAsyncWindowFunction<StateIterator<IN>, OUT, K, W> internalAsyncWindowFunction, AsyncTrigger<? super IN, ? super W> asyncTrigger, Evictor<? super IN, ? super W> evictor, long j, OutputTag<IN> outputTag) {
        super(windowAssigner, typeSerializer, keySelector, typeSerializer2, null, internalAsyncWindowFunction, asyncTrigger, j, outputTag);
        this.evictor = (Evictor) Preconditions.checkNotNull(evictor);
        this.evictingWindowStateDescriptor = (StateDescriptor) Preconditions.checkNotNull(stateDescriptor);
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator, org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        Object currentKey = getCurrentKey();
        ArrayList arrayList = new ArrayList();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        for (W w : assignWindows) {
            if (!isWindowLate(w)) {
                AtomicReference atomicReference = new AtomicReference();
                this.windowDeclaredVariable.set(w);
                arrayList.add(this.evictingWindowState.asyncAdd(streamRecord).thenCompose(r7 -> {
                    StateFuture<TriggerResult> onElement = this.triggerContext.onElement(streamRecord);
                    Objects.requireNonNull(atomicReference);
                    return onElement.thenAccept((v1) -> {
                        r1.set(v1);
                    });
                }).thenConditionallyCompose(r3 -> {
                    return Boolean.valueOf(((TriggerResult) atomicReference.get()).isFire());
                }, r9 -> {
                    return this.evictingWindowState.asyncGet().thenConditionallyCompose((v0) -> {
                        return Objects.nonNull(v0);
                    }, stateIterator -> {
                        return emitWindowContents(currentKey, w, stateIterator, this.evictingWindowState);
                    });
                }).thenConditionallyCompose(tuple2 -> {
                    return Boolean.valueOf(((TriggerResult) atomicReference.get()).isPurge());
                }, tuple22 -> {
                    return this.evictingWindowState.asyncClear();
                }).thenAccept(tuple23 -> {
                    registerCleanupTimer(w);
                }));
            }
        }
        if (arrayList.isEmpty() && isElementLate(streamRecord)) {
            if (this.lateDataOutputTag != null) {
                sideOutput(streamRecord);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator, org.apache.flink.streaming.api.operators.Triggerable
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        this.windowDeclaredVariable.set(internalTimer.getNamespace());
        AtomicReference atomicReference = new AtomicReference();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        StateFuture<TriggerResult> onEventTime = this.triggerContext.onEventTime(internalTimer.getTimestamp());
        Objects.requireNonNull(atomicReference);
        onEventTime.thenAccept((v1) -> {
            r1.set(v1);
        }).thenConditionallyCompose(r3 -> {
            return Boolean.valueOf(((TriggerResult) atomicReference.get()).isFire());
        }, r7 -> {
            return this.evictingWindowState.asyncGet().thenConditionallyCompose((v0) -> {
                return Objects.nonNull(v0);
            }, stateIterator -> {
                return emitWindowContents(internalTimer.getKey(), (Window) internalTimer.getNamespace(), stateIterator, this.evictingWindowState);
            });
        }).thenConditionallyCompose(tuple2 -> {
            return Boolean.valueOf(((TriggerResult) atomicReference.get()).isPurge());
        }, tuple22 -> {
            return this.evictingWindowState.asyncClear();
        }).thenConditionallyCompose(tuple23 -> {
            return Boolean.valueOf(this.windowAssigner.isEventTime() && isCleanupTime((Window) internalTimer.getNamespace(), internalTimer.getTimestamp()));
        }, tuple24 -> {
            return clearAllState((AsyncEvictingWindowOperator<K, IN, OUT, W>) internalTimer.getNamespace(), (ListState) this.evictingWindowState);
        });
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator, org.apache.flink.streaming.api.operators.Triggerable
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        this.windowDeclaredVariable.set(internalTimer.getNamespace());
        AtomicReference atomicReference = new AtomicReference();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        StateFuture<TriggerResult> onProcessingTime = this.triggerContext.onProcessingTime(internalTimer.getTimestamp());
        Objects.requireNonNull(atomicReference);
        onProcessingTime.thenAccept((v1) -> {
            r1.set(v1);
        }).thenConditionallyCompose(r3 -> {
            return Boolean.valueOf(((TriggerResult) atomicReference.get()).isFire());
        }, r7 -> {
            return this.evictingWindowState.asyncGet().thenConditionallyCompose((v0) -> {
                return Objects.nonNull(v0);
            }, stateIterator -> {
                return emitWindowContents(internalTimer.getKey(), (Window) internalTimer.getNamespace(), stateIterator, this.evictingWindowState);
            });
        }).thenConditionallyCompose(tuple2 -> {
            return Boolean.valueOf(((TriggerResult) atomicReference.get()).isPurge());
        }, tuple22 -> {
            return this.evictingWindowState.asyncClear();
        }).thenConditionallyCompose(tuple23 -> {
            return Boolean.valueOf(!this.windowAssigner.isEventTime() && isCleanupTime((Window) internalTimer.getNamespace(), internalTimer.getTimestamp()));
        }, tuple24 -> {
            return clearAllState((AsyncEvictingWindowOperator<K, IN, OUT, W>) internalTimer.getNamespace(), (ListState) this.evictingWindowState);
        });
    }

    private StateFuture<Void> emitWindowContents(K k, W w, StateIterator<StreamRecord<IN>> stateIterator, ListState<StreamRecord<IN>> listState) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        ArrayList arrayList = new ArrayList();
        return stateIterator.onNext(streamRecord -> {
            arrayList.add(streamRecord);
        }).thenApply(r6 -> {
            FluentIterable transform = FluentIterable.from(arrayList).transform(TimestampedValue::from);
            this.evictorContext.evictBefore(transform, Iterables.size(transform));
            return Tuple2.of(transform, new CompleteStateIterator(transform.transform((v0) -> {
                return v0.getValue();
            })));
        }).thenCompose(tuple2 -> {
            return ((InternalAsyncWindowFunction) this.userFunction).process(k, w, this.processContext, (StateIterator) tuple2.f1, this.timestampedCollector).thenCompose(r7 -> {
                this.evictorContext.evictAfter((Iterable) tuple2.f0, Iterables.size((Iterable) tuple2.f0));
                return listState.asyncUpdate((List) ((FluentIterable) tuple2.f0).stream().map((v0) -> {
                    return v0.getStreamRecord();
                }).collect(Collectors.toList()));
            });
        });
    }

    private StateFuture<Void> clearAllState(W w, ListState<StreamRecord<IN>> listState) {
        return listState.asyncClear().thenCompose(r3 -> {
            return this.triggerContext.clear();
        }).thenCompose(r32 -> {
            return this.processContext.clear();
        });
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator, org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(this.windowDeclaredVariable);
        this.evictingWindowState = (InternalListState) getOrCreateKeyedState(this.windowSerializer.createInstance2(), this.windowSerializer, this.evictingWindowStateDescriptor);
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator, org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator, org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator
    @VisibleForTesting
    public StateDescriptor<?> getStateDescriptor() {
        return this.evictingWindowStateDescriptor;
    }
}
