/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream;

import java.util.Iterator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterators;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
import org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;

public class StreamingSemiAntiJoinOperator
extends AbstractStreamingJoinOperator {
    private static final long serialVersionUID = -3135772379944924519L;
    private final boolean isAntiJoin;
    private transient OuterJoinRecordStateView leftRecordStateView;
    private transient JoinRecordStateView rightRecordStateView;

    public StreamingSemiAntiJoinOperator(boolean isAntiJoin, InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean[] filterNullKeys, long leftStateRetentionTime, long rightStateRetentionTIme) {
        super(leftType, rightType, generatedJoinCondition, leftInputSideSpec, rightInputSideSpec, filterNullKeys, leftStateRetentionTime, rightStateRetentionTIme);
        this.isAntiJoin = isAntiJoin;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.leftRecordStateView = OuterJoinRecordStateViews.create((RuntimeContext)this.getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime);
        this.rightRecordStateView = JoinRecordStateViews.create((RuntimeContext)this.getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime);
    }

    public void processElement1(StreamRecord<RowData> element) throws Exception {
        RowData input = (RowData)element.getValue();
        Iterator<AbstractStreamingJoinOperator.OuterRecord> associatedRecords = AbstractStreamingJoinOperator.iterator(input, true, this.rightRecordStateView, this.joinCondition);
        if (!associatedRecords.hasNext()) {
            if (this.isAntiJoin) {
                this.collector.collect((Object)input);
            }
        } else if (!this.isAntiJoin) {
            this.collector.collect((Object)input);
        }
        if (RowDataUtil.isAccumulateMsg(input)) {
            input.setRowKind(RowKind.INSERT);
            this.leftRecordStateView.addRecord(input, Iterators.size(associatedRecords));
        } else {
            input.setRowKind(RowKind.INSERT);
            this.leftRecordStateView.retractRecord(input);
        }
    }

    public void processElement2(StreamRecord<RowData> element) throws Exception {
        RowData input = (RowData)element.getValue();
        boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(input);
        RowKind inputRowKind = input.getRowKind();
        input.setRowKind(RowKind.INSERT);
        Iterator<AbstractStreamingJoinOperator.OuterRecord> associatedRecords = AbstractStreamingJoinOperator.iterator(input, false, this.leftRecordStateView, this.joinCondition);
        if (isAccumulateMsg) {
            this.rightRecordStateView.addRecord(input);
            while (associatedRecords.hasNext()) {
                AbstractStreamingJoinOperator.OuterRecord outerRecord = associatedRecords.next();
                RowData other = outerRecord.record;
                if (outerRecord.numOfAssociations == 0) {
                    if (this.isAntiJoin) {
                        other.setRowKind(RowKind.DELETE);
                    } else {
                        other.setRowKind(inputRowKind);
                    }
                    this.collector.collect((Object)other);
                    other.setRowKind(RowKind.INSERT);
                }
                this.leftRecordStateView.updateNumOfAssociations(other, outerRecord.numOfAssociations + 1);
            }
        } else {
            this.rightRecordStateView.retractRecord(input);
            while (associatedRecords.hasNext()) {
                AbstractStreamingJoinOperator.OuterRecord outerRecord = associatedRecords.next();
                RowData other = outerRecord.record;
                if (outerRecord.numOfAssociations == 1) {
                    if (!this.isAntiJoin) {
                        other.setRowKind(inputRowKind);
                    } else {
                        other.setRowKind(RowKind.INSERT);
                    }
                    this.collector.collect((Object)other);
                    other.setRowKind(RowKind.INSERT);
                }
                this.leftRecordStateView.updateNumOfAssociations(other, outerRecord.numOfAssociations - 1);
            }
        }
    }
}

