package org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers;

import java.io.EOFException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer;
import org.apache.flink.table.runtime.operators.window.async.tvf.combines.AsyncStateRecordsCombiner;
import org.apache.flink.table.runtime.operators.window.async.tvf.state.AsyncStateKeyContext;
import org.apache.flink.table.runtime.operators.window.async.tvf.state.WindowAsyncState;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.AsyncStateUtils;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/asyncwindow/buffers/AsyncStateRecordsWindowBuffer.class */
public final class AsyncStateRecordsWindowBuffer implements AsyncStateWindowBuffer {
    private final AsyncStateRecordsCombiner combineFunction;
    private final WindowBytesMultiMap recordsBuffer;
    private final WindowKey reuseWindowKey;
    private final AbstractRowDataSerializer<RowData> recordSerializer;
    private final ZoneId shiftTimeZone;
    private final RecordEqualiser keyEqualiser;
    private final AsyncStateKeyContext keyContext;
    private long minSliceEnd = Long.MAX_VALUE;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/asyncwindow/buffers/AsyncStateRecordsWindowBuffer$Factory.class */
    public static final class Factory implements AsyncStateWindowBuffer.Factory {
        private static final long serialVersionUID = 1;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;
        private final AsyncStateRecordsCombiner.Factory factory;
        private final GeneratedRecordEqualiser generatedKeyEqualiser;

        public Factory(PagedTypeSerializer<RowData> pagedTypeSerializer, AbstractRowDataSerializer<RowData> abstractRowDataSerializer, AsyncStateRecordsCombiner.Factory factory, GeneratedRecordEqualiser generatedRecordEqualiser) {
            this.keySer = pagedTypeSerializer;
            this.inputSer = abstractRowDataSerializer;
            this.factory = factory;
            this.generatedKeyEqualiser = generatedRecordEqualiser;
        }

        @Override // org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer.Factory
        public AsyncStateWindowBuffer create(Object obj, MemoryManager memoryManager, long j, RuntimeContext runtimeContext, WindowTimerService<Long> windowTimerService, AsyncStateKeyContext asyncStateKeyContext, WindowAsyncState<Long> windowAsyncState, boolean z, ZoneId zoneId) throws Exception {
            return new AsyncStateRecordsWindowBuffer(obj, memoryManager, j, this.factory.createRecordsCombiner(runtimeContext, windowTimerService, windowAsyncState, z), this.keySer, this.inputSer, this.generatedKeyEqualiser.newInstance(runtimeContext.getUserCodeClassLoader()), asyncStateKeyContext, zoneId);
        }
    }

    public AsyncStateRecordsWindowBuffer(Object obj, MemoryManager memoryManager, long j, AsyncStateRecordsCombiner asyncStateRecordsCombiner, PagedTypeSerializer<RowData> pagedTypeSerializer, AbstractRowDataSerializer<RowData> abstractRowDataSerializer, RecordEqualiser recordEqualiser, AsyncStateKeyContext asyncStateKeyContext, ZoneId zoneId) {
        this.combineFunction = asyncStateRecordsCombiner;
        this.recordsBuffer = new WindowBytesMultiMap(obj, memoryManager, j, pagedTypeSerializer, abstractRowDataSerializer.getArity());
        this.recordSerializer = abstractRowDataSerializer;
        this.keyEqualiser = recordEqualiser;
        this.keyContext = asyncStateKeyContext;
        this.reuseWindowKey = new WindowKeySerializer(pagedTypeSerializer).m270createInstance();
        this.shiftTimeZone = zoneId;
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer
    public StateFuture<Void> addElement(RowData rowData, long j, RowData rowData2) throws Exception {
        StateFuture<Void> stateFuture = AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
        this.minSliceEnd = Math.min(j, this.minSliceEnd);
        this.reuseWindowKey.replace(j, rowData);
        try {
            this.recordsBuffer.append(this.recordsBuffer.lookup(this.reuseWindowKey), this.recordSerializer.toBinaryRow(rowData2));
        } catch (EOFException e) {
            stateFuture = flush(rowData);
            addElement(rowData, j, rowData2);
        }
        return stateFuture;
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer
    public StateFuture<Void> advanceProgress(@Nullable RowData rowData, long j) throws Exception {
        return TimeWindowUtil.isWindowFired(this.minSliceEnd, j, this.shiftTimeZone) ? flush(rowData) : AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer
    public StateFuture<Void> flush(@Nullable RowData rowData) throws Exception {
        StateFuture<Void> stateFuture = AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
        if (this.recordsBuffer.getNumKeys() > 0) {
            KeyValueIterator<WindowKey, Iterator<RowData>> entryIterator = this.recordsBuffer.getEntryIterator(true);
            while (entryIterator.advanceNext()) {
                WindowKey key = entryIterator.getKey();
                long window = key.getWindow();
                List<RowData> itertorToList = itertorToList(entryIterator.getValue());
                if (rowData == null || !this.keyEqualiser.equals(rowData, key.getKey())) {
                    this.keyContext.asyncProcessWithKey(key.getKey(), () -> {
                        this.combineFunction.asyncCombine(Long.valueOf(window), itertorToList.iterator());
                    });
                } else {
                    stateFuture = this.combineFunction.asyncCombine(Long.valueOf(window), itertorToList.iterator());
                }
            }
            this.recordsBuffer.reset();
            this.minSliceEnd = Long.MAX_VALUE;
        }
        return stateFuture;
    }

    private List<RowData> itertorToList(Iterator<RowData> it) {
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        return arrayList;
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer
    public void close() throws Exception {
        this.recordsBuffer.free();
        this.combineFunction.close();
    }
}
