package org.apache.flink.table.runtime.operators.rank.async;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.class */
public class AsyncStateAppendOnlyTopNFunction extends AbstractAsyncStateTopNFunction {
    private static final long serialVersionUID = 1;
    private final InternalTypeInfo<RowData> sortKeyType;
    private final TypeSerializer<RowData> inputRowSer;
    private final long cacheSize;
    private transient MapState<RowData, List<RowData>> dataState;
    private transient AsyncStateAppendOnlyTopNHelper helper;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction$AsyncStateAppendOnlyTopNHelper.class */
    public class AsyncStateAppendOnlyTopNHelper extends AppendOnlyTopNHelper {
        public AsyncStateAppendOnlyTopNHelper() {
            super(AsyncStateAppendOnlyTopNFunction.this, AsyncStateAppendOnlyTopNFunction.this.cacheSize, AsyncStateAppendOnlyTopNFunction.this.getDefaultTopNSize());
        }

        @Override // org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper
        protected void removeFromState(RowData rowData) throws Exception {
            AsyncStateAppendOnlyTopNFunction.this.dataState.asyncRemove(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper
        protected void updateState(RowData rowData, List<RowData> list) throws Exception {
            AsyncStateAppendOnlyTopNFunction.this.dataState.asyncPut(rowData, list);
        }
    }

    public AsyncStateAppendOnlyTopNFunction(StateTtlConfig stateTtlConfig, InternalTypeInfo<RowData> internalTypeInfo, GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2, long j) {
        super(stateTtlConfig, internalTypeInfo, generatedRecordComparator, rowDataKeySelector, rankType, rankRange, z, z2);
        this.sortKeyType = rowDataKeySelector.mo152getProducedType();
        this.inputRowSer = internalTypeInfo.createSerializer(new SerializerConfigImpl());
        this.cacheSize = j;
    }

    @Override // org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction, org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("data-state-with-append", this.sortKeyType, new ListTypeInfo(this.inputRowType));
        if (this.ttlConfig.isEnabled()) {
            mapStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.dataState = getRuntimeContext().getMapState(mapStateDescriptor);
        this.helper = new AsyncStateAppendOnlyTopNHelper();
        this.helper.registerMetric();
    }

    public void processElement(RowData rowData, KeyedProcessFunction<RowData, RowData, RowData>.Context context, Collector<RowData> collector) throws Exception {
        StateFuture<TopNBuffer> initHeapStates = initHeapStates();
        StateFuture<Long> initRankEnd = initRankEnd(rowData);
        RowData rowData2 = (RowData) this.sortKeySelector.getKey(rowData);
        initHeapStates.thenCombine(initRankEnd, (topNBuffer, l) -> {
            if (!checkSortKeyInBufferRange(rowData2, topNBuffer)) {
                return null;
            }
            topNBuffer.put(rowData2, (RowData) this.inputRowSer.copy(rowData));
            this.dataState.asyncPut(rowData2, new ArrayList(topNBuffer.get(rowData2))).thenAccept(r14 -> {
                if (this.outputRankNumber || hasOffset()) {
                    this.helper.processElementWithRowNumber(topNBuffer, rowData2, rowData, l.longValue(), collector);
                } else {
                    this.helper.processElementWithoutRowNumber(topNBuffer, rowData, l.longValue(), collector);
                }
            });
            return null;
        });
    }

    private StateFuture<TopNBuffer> initHeapStates() {
        this.helper.accRequestCount();
        RowData rowData = (RowData) this.keyContext.getCurrentKey();
        TopNBuffer topNBufferFromCache = this.helper.getTopNBufferFromCache(rowData);
        if (topNBufferFromCache != null) {
            this.helper.accHitCount();
            return StateFutureUtils.completedFuture(topNBufferFromCache);
        }
        TopNBuffer topNBuffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
        return this.dataState.asyncEntries().thenCompose(stateIterator -> {
            return stateIterator.onNext(entry -> {
                topNBuffer.putAll((RowData) entry.getKey(), (List) entry.getValue());
            });
        }).thenApply(r7 -> {
            this.helper.saveTopNBufferToCache(rowData, topNBuffer);
            return topNBuffer;
        });
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (KeyedProcessFunction<RowData, RowData, RowData>.Context) context, (Collector<RowData>) collector);
    }
}
