package org.apache.flink.table.runtime.operators.aggregate.asyncwindow.combines;

import java.util.Iterator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.dataview.UnsupportedStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.window.async.tvf.combines.AsyncStateRecordsCombiner;
import org.apache.flink.table.runtime.operators.window.async.tvf.state.WindowAsyncState;
import org.apache.flink.table.runtime.operators.window.async.tvf.state.WindowAsyncValueState;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/asyncwindow/combines/AsyncStateAggCombiner.class */
public class AsyncStateAggCombiner implements AsyncStateRecordsCombiner {
    private final WindowTimerService<Long> timerService;
    private final WindowAsyncValueState<Long> accState;
    private final NamespaceAggsHandleFunction<Long> aggregator;
    private final boolean isEventTime;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/asyncwindow/combines/AsyncStateAggCombiner$Factory.class */
    public static final class Factory implements AsyncStateRecordsCombiner.Factory {
        private static final long serialVersionUID = 1;
        private final GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler;

        public Factory(GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction) {
            this.genAggsHandler = generatedNamespaceAggsHandleFunction;
        }

        @Override // org.apache.flink.table.runtime.operators.window.async.tvf.combines.AsyncStateRecordsCombiner.Factory
        public AsyncStateRecordsCombiner createRecordsCombiner(RuntimeContext runtimeContext, WindowTimerService<Long> windowTimerService, WindowAsyncState<Long> windowAsyncState, boolean z) throws Exception {
            NamespaceAggsHandleFunction newInstance = this.genAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
            newInstance.open(new UnsupportedStateDataViewStore(runtimeContext));
            return new AsyncStateAggCombiner(windowTimerService, (WindowAsyncValueState) windowAsyncState, newInstance, z);
        }
    }

    public AsyncStateAggCombiner(WindowTimerService<Long> windowTimerService, WindowAsyncValueState<Long> windowAsyncValueState, NamespaceAggsHandleFunction<Long> namespaceAggsHandleFunction, boolean z) {
        this.timerService = windowTimerService;
        this.accState = windowAsyncValueState;
        this.aggregator = namespaceAggsHandleFunction;
        this.isEventTime = z;
    }

    @Override // org.apache.flink.table.runtime.operators.window.async.tvf.combines.AsyncStateRecordsCombiner
    public StateFuture<Void> asyncCombine(Long l, Iterator<RowData> it) throws Exception {
        StateFuture<Void> thenCompose = this.accState.asyncValue(l).thenCompose(rowData -> {
            if (rowData == null) {
                rowData = this.aggregator.createAccumulators();
            }
            this.aggregator.setAccumulators(l, rowData);
            while (it.hasNext()) {
                RowData rowData = (RowData) it.next();
                if (RowDataUtil.isAccumulateMsg(rowData)) {
                    this.aggregator.accumulate(rowData);
                } else {
                    this.aggregator.retract(rowData);
                }
            }
            return this.accState.asyncUpdate(l, this.aggregator.getAccumulators());
        });
        if (this.isEventTime) {
            if (!TimeWindowUtil.isWindowFired(l.longValue(), this.timerService.currentWatermark(), this.timerService.getShiftTimeZone())) {
                this.timerService.registerEventTimeWindowTimer(l);
            }
        }
        return thenCompose;
    }

    @Override // org.apache.flink.table.runtime.operators.window.async.tvf.combines.AsyncStateRecordsCombiner
    public void close() throws Exception {
        this.aggregator.close();
    }
}
