package org.apache.flink.table.runtime.operators.aggregate.asyncwindow.processors;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer;
import org.apache.flink.table.runtime.operators.window.async.AsyncMergeCallback;
import org.apache.flink.table.runtime.operators.window.async.tvf.common.AsyncStateWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AsyncStateSliceSharedWindowAggProcessor.class */
public final class AsyncStateSliceSharedWindowAggProcessor extends AbstractAsyncStateSliceWindowAggProcessor implements AsyncMergeCallback<Long, Iterable<Long>> {
    private static final long serialVersionUID = 1;
    private final SliceSharedAssigner sliceSharedAssigner;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AsyncStateSliceSharedWindowAggProcessor$SliceMergeTargetHelper.class */
    public static final class SliceMergeTargetHelper implements AsyncMergeCallback<Long, Iterable<Long>>, Serializable {
        private static final long serialVersionUID = 1;
        private static final StateFuture<Tuple2<RowData, RowData>> REUSABLE_FUTURE_RESULT = StateFutureUtils.completedFuture((Object) null);
        private Long mergeTarget = null;

        private SliceMergeTargetHelper() {
        }

        @Override // org.apache.flink.table.runtime.operators.window.async.AsyncMergeCallback
        public StateFuture<Tuple2<RowData, RowData>> asyncMerge(@Nullable Long l, Iterable<Long> iterable, Long l2) {
            this.mergeTarget = l;
            return REUSABLE_FUTURE_RESULT;
        }

        public Long getMergeTarget() {
            return this.mergeTarget;
        }
    }

    public AsyncStateSliceSharedWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction, AsyncStateWindowBuffer.Factory factory, SliceSharedAssigner sliceSharedAssigner, TypeSerializer<RowData> typeSerializer, int i, ZoneId zoneId) {
        super(generatedNamespaceAggsHandleFunction, factory, sliceSharedAssigner, typeSerializer, i, zoneId);
        this.sliceSharedAssigner = sliceSharedAssigner;
    }

    @Override // org.apache.flink.table.runtime.operators.window.async.tvf.common.AsyncStateWindowProcessor
    public StateFuture<Void> fireWindow(long j, Long l) throws Exception {
        return this.sliceSharedAssigner.asyncMergeSlices(l.longValue(), this).thenAccept(tuple2 -> {
            if (!this.emptyChecker.apply((RowData) tuple2.f0).booleanValue()) {
                collect(((AsyncStateWindowProcessor.AsyncStateContext) this.ctx).getAsyncKeyContext().getCurrentKey(), (RowData) tuple2.f1);
            }
            Optional<Long> nextTriggerWindow = this.sliceSharedAssigner.nextTriggerWindow(l.longValue(), (RowData) tuple2.f0, this.emptyChecker);
            if (nextTriggerWindow.isPresent()) {
                long longValue = nextTriggerWindow.get().longValue();
                if (this.sliceSharedAssigner.isEventTime()) {
                    this.windowTimerService.registerEventTimeWindowTimer(Long.valueOf(longValue));
                } else {
                    this.windowTimerService.registerProcessingTimeWindowTimer(Long.valueOf(longValue));
                }
            }
        });
    }

    @Override // org.apache.flink.table.runtime.operators.window.async.AsyncMergeCallback
    public StateFuture<Tuple2<RowData, RowData>> asyncMerge(@Nullable Long l, Iterable<Long> iterable, Long l2) throws Exception {
        return (l == null ? StateFutureUtils.completedFuture(this.aggregator.createAccumulators()) : this.windowState.asyncValue(l).thenApply(rowData -> {
            return rowData == null ? this.aggregator.createAccumulators() : rowData;
        })).thenCombine(collectAccOfSlicesToBeMerged(iterable), (rowData2, collection) -> {
            this.aggregator.setAccumulators(l, rowData2);
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Tuple2 tuple2 = (Tuple2) it.next();
                RowData rowData2 = (RowData) tuple2.f1;
                if (rowData2 != null) {
                    this.aggregator.merge((Long) tuple2.f0, rowData2);
                }
            }
            return Tuple2.of(this.aggregator.getAccumulators(), this.aggregator.getValue(l2));
        }).thenCompose(tuple2 -> {
            return l != null ? this.windowState.asyncUpdate(l, (RowData) tuple2.f0).thenApply(r3 -> {
                return tuple2;
            }) : StateFutureUtils.completedFuture(tuple2);
        });
    }

    private StateFuture<Collection<Tuple2<Long, RowData>>> collectAccOfSlicesToBeMerged(Iterable<Long> iterable) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Long l : iterable) {
            arrayList.add(this.windowState.asyncValue(l).thenApply(rowData -> {
                return Tuple2.of(l, rowData);
            }));
        }
        return StateFutureUtils.combineAll(arrayList);
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.asyncwindow.processors.AbstractAsyncStateSliceWindowAggProcessor
    protected StateFuture<Long> sliceStateMergeTarget(long j) throws Exception {
        SliceMergeTargetHelper sliceMergeTargetHelper = new SliceMergeTargetHelper();
        return this.sliceSharedAssigner.asyncMergeSlices(j, sliceMergeTargetHelper).thenApply(tuple2 -> {
            return sliceMergeTargetHelper.getMergeTarget() != null ? sliceMergeTargetHelper.getMergeTarget() : Long.valueOf(j);
        });
    }
}
