/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;

public class NonBufferOverWindowOperator
extends TableStreamOperator<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow> {
    private GeneratedAggsHandleFunction[] aggsHandlers;
    private GeneratedRecordComparator genComparator;
    private final boolean[] resetAccumulators;
    private RecordComparator partitionComparator;
    private BaseRow lastInput;
    private AggsHandleFunction[] processors;
    private JoinedRow[] joinedRows;
    private StreamRecordCollector<BaseRow> collector;
    private AbstractRowSerializer<BaseRow> serializer;

    public NonBufferOverWindowOperator(GeneratedAggsHandleFunction[] aggsHandlers, GeneratedRecordComparator genComparator, boolean[] resetAccumulators) {
        this.aggsHandlers = aggsHandlers;
        this.genComparator = genComparator;
        this.resetAccumulators = resetAccumulators;
    }

    public void open() throws Exception {
        super.open();
        ClassLoader cl = this.getUserCodeClassloader();
        this.serializer = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn1(cl);
        this.partitionComparator = (RecordComparator)this.genComparator.newInstance(cl);
        this.genComparator = null;
        this.collector = new StreamRecordCollector(this.output);
        this.processors = new AggsHandleFunction[this.aggsHandlers.length];
        this.joinedRows = new JoinedRow[this.aggsHandlers.length];
        for (int i = 0; i < this.aggsHandlers.length; ++i) {
            AggsHandleFunction func = (AggsHandleFunction)this.aggsHandlers[i].newInstance(cl);
            func.open(new PerKeyStateDataViewStore((RuntimeContext)this.getRuntimeContext()));
            this.processors[i] = func;
            this.joinedRows[i] = new JoinedRow();
        }
        this.aggsHandlers = null;
    }

    public void processElement(StreamRecord<BaseRow> element) throws Exception {
        BaseRow input = (BaseRow)element.getValue();
        boolean changePartition = this.lastInput == null || this.partitionComparator.compare(this.lastInput, input) != 0;
        BaseRow output = input;
        for (int i = 0; i < this.processors.length; ++i) {
            AggsHandleFunction processor = this.processors[i];
            if (changePartition || this.resetAccumulators[i]) {
                processor.setAccumulators(processor.createAccumulators());
            }
            processor.accumulate(input);
            BaseRow value = processor.getValue();
            output = this.joinedRows[i].replace(output, value);
        }
        this.collector.collect(output);
        if (changePartition) {
            this.lastInput = (BaseRow)this.serializer.copy(input);
        }
    }
}

