package org.apache.flink.table.runtime.operators.join.window.asyncprocessing;

import java.time.ZoneId;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.AsyncStateTableStreamOperator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
import org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper;
import org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state.WindowListAsyncState;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.class */
public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator<RowData> implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, Long>, KeyContext {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(AsyncStateWindowJoinOperator.class);
    private static final String LEFT_RECORDS_STATE_NAME = "left-records";
    private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
    private final RowDataSerializer leftSerializer;
    private final RowDataSerializer rightSerializer;
    private final GeneratedJoinCondition generatedJoinCondition;
    private final int leftWindowEndIndex;
    private final int rightWindowEndIndex;
    private final boolean[] filterNullKeys;
    private final ZoneId shiftTimeZone;
    private final FlinkJoinType joinType;
    private transient WindowTimerService<Long> windowTimerService;
    private transient JoinConditionWithNullFilters joinCondition;
    private transient TimestampedCollector<RowData> collector;
    private transient WindowListAsyncState<Long> leftWindowState;
    private transient WindowListAsyncState<Long> rightWindowState;
    private transient WindowJoinHelper helper;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator$AsyncStateWindowJoinHelper.class */
    private class AsyncStateWindowJoinHelper extends WindowJoinHelper {
        public AsyncStateWindowJoinHelper() {
            super(AsyncStateWindowJoinOperator.this.leftSerializer, AsyncStateWindowJoinOperator.this.rightSerializer, AsyncStateWindowJoinOperator.this.shiftTimeZone, AsyncStateWindowJoinOperator.this.windowTimerService, AsyncStateWindowJoinOperator.this.joinCondition, AsyncStateWindowJoinOperator.this.collector, AsyncStateWindowJoinOperator.this.joinType);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper
        public void clearState(long j, boolean z) {
            if (z) {
                AsyncStateWindowJoinOperator.this.leftWindowState.asyncClear(Long.valueOf(j));
            } else {
                AsyncStateWindowJoinOperator.this.rightWindowState.asyncClear(Long.valueOf(j));
            }
        }
    }

    public AsyncStateWindowJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId, FlinkJoinType flinkJoinType) {
        this.leftSerializer = (RowDataSerializer) typeSerializer;
        this.rightSerializer = (RowDataSerializer) typeSerializer2;
        this.generatedJoinCondition = generatedJoinCondition;
        this.leftWindowEndIndex = i;
        this.rightWindowEndIndex = i2;
        this.filterNullKeys = zArr;
        this.shiftTimeZone = zoneId;
        this.joinType = flinkJoinType;
    }

    @Override // org.apache.flink.table.runtime.operators.AsyncStateTableStreamOperator
    public void open() throws Exception {
        super.open();
        LOG.info("Window join is using async state");
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        this.windowTimerService = new SlicingWindowTimerServiceImpl(getInternalTimerService("window-timers", longSerializer, this), this.shiftTimeZone);
        this.joinCondition = new JoinConditionWithNullFilters(this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()), this.filterNullKeys, this);
        this.joinCondition.setRuntimeContext(getRuntimeContext());
        this.joinCondition.open(DefaultOpenContext.INSTANCE);
        this.leftWindowState = new WindowListAsyncState<>((ListState) getOrCreateKeyedState(Long.MIN_VALUE, longSerializer, new ListStateDescriptor(LEFT_RECORDS_STATE_NAME, this.leftSerializer)));
        this.rightWindowState = new WindowListAsyncState<>((ListState) getOrCreateKeyedState(Long.MIN_VALUE, longSerializer, new ListStateDescriptor(RIGHT_RECORDS_STATE_NAME, this.rightSerializer)));
        this.helper = new AsyncStateWindowJoinHelper();
        this.helper.registerMetric(getRuntimeContext().getMetricGroup());
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        if (this.joinCondition != null) {
            this.joinCondition.close();
        }
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        this.helper.processElement(streamRecord, this.leftWindowEndIndex, this.helper.getLeftLateRecordsDroppedRate(), (l, rowData) -> {
            this.leftWindowState.asyncAdd(l, rowData);
        });
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        this.helper.processElement(streamRecord, this.rightWindowEndIndex, this.helper.getRightLateRecordsDroppedRate(), (l, rowData) -> {
            this.rightWindowState.asyncAdd(l, rowData);
        });
    }

    public void onProcessingTime(InternalTimer<RowData, Long> internalTimer) throws Exception {
        throw new UnsupportedOperationException("This is a bug and should not happen. Please file an issue.");
    }

    public void onEventTime(InternalTimer<RowData, Long> internalTimer) throws Exception {
        triggerJoin(((Long) internalTimer.getNamespace()).longValue());
    }

    private void triggerJoin(long j) {
        StateFutureUtils.toIterable(this.leftWindowState.asyncGet(Long.valueOf(j))).thenCombine(StateFutureUtils.toIterable(this.rightWindowState.asyncGet(Long.valueOf(j))), (iterable, iterable2) -> {
            this.helper.joinAndClear(j, iterable, iterable2);
            return null;
        });
    }
}
