package org.apache.flink.table.runtime.operators.rank.async;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.utils.FastTop1Helper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.class */
public class AsyncStateFastTop1Function extends AbstractAsyncStateTopNFunction implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private final TypeSerializer<RowData> inputRowSer;
    private final long cacheSize;
    private transient ValueState<RowData> dataState;
    private transient AsyncStateFastTop1Helper helper;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function$AsyncStateFastTop1Helper.class */
    public class AsyncStateFastTop1Helper extends FastTop1Helper {
        public AsyncStateFastTop1Helper() {
            super(AsyncStateFastTop1Function.this, AsyncStateFastTop1Function.this.inputRowSer, AsyncStateFastTop1Function.this.cacheSize, AsyncStateFastTop1Function.this.getDefaultTopNSize());
        }

        @Override // org.apache.flink.table.runtime.operators.rank.utils.FastTop1Helper
        public void flushBufferToState(RowData rowData, RowData rowData2) throws Exception {
            this.keyContext.asyncProcessWithKey(rowData, () -> {
                AsyncStateFastTop1Function.this.dataState.asyncUpdate(rowData2);
            });
        }
    }

    public AsyncStateFastTop1Function(StateTtlConfig stateTtlConfig, InternalTypeInfo<RowData> internalTypeInfo, GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2, long j) {
        super(stateTtlConfig, internalTypeInfo, generatedRecordComparator, rowDataKeySelector, rankType, rankRange, z, z2);
        this.inputRowSer = internalTypeInfo.createSerializer(new SerializerConfigImpl());
        this.cacheSize = j;
    }

    @Override // org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction, org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("Top1-Rank-State", this.inputRowType);
        if (this.ttlConfig.isEnabled()) {
            valueStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.dataState = getRuntimeContext().getValueState(valueStateDescriptor);
        this.helper = new AsyncStateFastTop1Helper();
        this.helper.registerMetric();
    }

    public void processElement(RowData rowData, KeyedProcessFunction<RowData, RowData, RowData>.Context context, Collector<RowData> collector) throws Exception {
        StateFuture completedFuture;
        this.helper.accRequestCount();
        RowData rowData2 = (RowData) this.keyContext.getCurrentKey();
        RowData prevRowFromCache = this.helper.getPrevRowFromCache(rowData2);
        if (prevRowFromCache == null) {
            completedFuture = this.dataState.asyncValue();
        } else {
            this.helper.accHitCount();
            completedFuture = StateFutureUtils.completedFuture(prevRowFromCache);
        }
        completedFuture.thenAccept(rowData3 -> {
            if (rowData3 == null) {
                this.helper.processAsFirstRow(rowData, rowData2, collector);
            } else {
                this.helper.processWithPrevRow(rowData, rowData2, rowData3, collector);
            }
        });
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.helper.flushAllCacheToState();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (KeyedProcessFunction<RowData, RowData, RowData>.Context) context, (Collector<RowData>) collector);
    }
}
