package org.apache.flink.table.runtime.operators.join.stream.asyncprocessing;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView;
import org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateViews;
import org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView;
import org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateViews;
import org.apache.flink.table.runtime.operators.join.stream.utils.AssociatedRecords;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinHelper;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.class */
public class AsyncStateStreamingJoinOperator extends AbstractAsyncStateStreamingJoinOperator {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(AsyncStateStreamingJoinOperator.class);
    private final boolean leftIsOuter;
    private final boolean rightIsOuter;
    private transient JoinedRowData outRow;
    private transient RowData leftNullRow;
    private transient RowData rightNullRow;
    private transient JoinRecordAsyncStateView leftRecordAsyncStateView;
    private transient JoinRecordAsyncStateView rightRecordAsyncStateView;
    private transient AsyncStateJoinHelper joinHelper;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator$AsyncStateJoinHelper.class */
    private class AsyncStateJoinHelper extends JoinHelper<JoinRecordAsyncStateView, OuterJoinRecordAsyncStateView> {
        public AsyncStateJoinHelper() {
            super(AsyncStateStreamingJoinOperator.this.leftIsOuter, AsyncStateStreamingJoinOperator.this.rightIsOuter, AsyncStateStreamingJoinOperator.this.outRow, AsyncStateStreamingJoinOperator.this.leftNullRow, AsyncStateStreamingJoinOperator.this.rightNullRow, AsyncStateStreamingJoinOperator.this.collector);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.utils.JoinHelper
        public void addRecord(JoinRecordAsyncStateView joinRecordAsyncStateView, RowData rowData) throws Exception {
            joinRecordAsyncStateView.addRecord(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.utils.JoinHelper
        public void retractRecord(JoinRecordAsyncStateView joinRecordAsyncStateView, RowData rowData) throws Exception {
            joinRecordAsyncStateView.retractRecord(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.utils.JoinHelper
        public void addRecordInOuterSide(OuterJoinRecordAsyncStateView outerJoinRecordAsyncStateView, RowData rowData, int i) throws Exception {
            outerJoinRecordAsyncStateView.addRecord(rowData, i);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.utils.JoinHelper
        public void updateNumOfAssociationsInOuterSide(OuterJoinRecordAsyncStateView outerJoinRecordAsyncStateView, RowData rowData, int i) throws Exception {
            outerJoinRecordAsyncStateView.updateNumOfAssociations(rowData, i);
        }
    }

    public AsyncStateStreamingJoinOperator(InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec joinInputSideSpec, JoinInputSideSpec joinInputSideSpec2, boolean z, boolean z2, boolean[] zArr, long j, long j2) {
        super(internalTypeInfo, internalTypeInfo2, generatedJoinCondition, joinInputSideSpec, joinInputSideSpec2, zArr, j, j2);
        this.leftIsOuter = z;
        this.rightIsOuter = z2;
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.AbstractAsyncStateStreamingJoinOperator
    public void open() throws Exception {
        super.open();
        LOG.info("Join is using async state");
        this.outRow = new JoinedRowData();
        this.leftNullRow = new GenericRowData(this.leftType.toRowSize());
        this.rightNullRow = new GenericRowData(this.rightType.toRowSize());
        if (this.leftIsOuter) {
            this.leftRecordAsyncStateView = OuterJoinRecordAsyncStateViews.create(getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime);
        } else {
            this.leftRecordAsyncStateView = JoinRecordAsyncStateViews.create(getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime);
        }
        if (this.rightIsOuter) {
            this.rightRecordAsyncStateView = OuterJoinRecordAsyncStateViews.create(getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime);
        } else {
            this.rightRecordAsyncStateView = JoinRecordAsyncStateViews.create(getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime);
        }
        this.joinHelper = new AsyncStateJoinHelper();
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        doProcessElement((RowData) streamRecord.getValue(), this.leftRecordAsyncStateView, this.rightRecordAsyncStateView, true);
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        doProcessElement((RowData) streamRecord.getValue(), this.rightRecordAsyncStateView, this.leftRecordAsyncStateView, false);
    }

    private void doProcessElement(RowData rowData, JoinRecordAsyncStateView joinRecordAsyncStateView, JoinRecordAsyncStateView joinRecordAsyncStateView2, boolean z) throws Exception {
        RowKind rowKind = rowData.getRowKind();
        rowData.setRowKind(RowKind.INSERT);
        StateFuture<AssociatedRecords> fromAsyncStateView = AssociatedRecords.fromAsyncStateView(rowData, z, joinRecordAsyncStateView2, this.joinCondition);
        rowData.setRowKind(rowKind);
        fromAsyncStateView.thenAccept(associatedRecords -> {
            this.joinHelper.processJoin(rowData, joinRecordAsyncStateView, joinRecordAsyncStateView2, z, associatedRecords, false);
        });
    }
}
