/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.util.Collector;

public class MiniBatchLocalGroupAggFunction
extends MapBundleFunction<BaseRow, BaseRow, BaseRow, BaseRow> {
    private static final long serialVersionUID = 5417039295967495506L;
    private final GeneratedAggsHandleFunction genAggsHandler;
    private transient JoinedRow resultRow = new JoinedRow();
    private transient AggsHandleFunction function = null;

    public MiniBatchLocalGroupAggFunction(GeneratedAggsHandleFunction genAggsHandler) {
        this.genAggsHandler = genAggsHandler;
    }

    @Override
    public void open(ExecutionContext ctx) throws Exception {
        super.open(ctx);
        this.function = (AggsHandleFunction)this.genAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.function.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext()));
        this.resultRow = new JoinedRow();
    }

    @Override
    public BaseRow addInput(@Nullable BaseRow previousAcc, BaseRow input) throws Exception {
        BaseRow currentAcc = previousAcc == null ? this.function.createAccumulators() : previousAcc;
        this.function.setAccumulators(currentAcc);
        if (BaseRowUtil.isAccumulateMsg(input)) {
            this.function.accumulate(input);
        } else {
            this.function.retract(input);
        }
        return this.function.getAccumulators();
    }

    @Override
    public void finishBundle(Map<BaseRow, BaseRow> buffer, Collector<BaseRow> out) throws Exception {
        for (Map.Entry<BaseRow, BaseRow> entry : buffer.entrySet()) {
            BaseRow currentKey = entry.getKey();
            BaseRow currentAcc = entry.getValue();
            this.resultRow.replace(currentKey, currentAcc);
            out.collect((Object)this.resultRow);
        }
        buffer.clear();
    }

    @Override
    public void close() throws Exception {
        if (this.function != null) {
            this.function.close();
        }
    }
}

