package org.apache.flink.table.runtime.operators.join.temporal;

import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.class */
public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorWithStateRetention {
    private static final long serialVersionUID = -5182289624027523612L;
    private final boolean isLeftOuterJoin;
    private final InternalTypeInfo<RowData> rightType;
    private final GeneratedJoinCondition generatedJoinCondition;
    private transient ValueState<RowData> rightState;
    private transient JoinCondition joinCondition;
    private transient JoinedRowData outRow;
    private transient GenericRowData rightNullRow;
    private transient TimestampedCollector<RowData> collector;

    public TemporalProcessTimeJoinOperator(InternalTypeInfo<RowData> internalTypeInfo, GeneratedJoinCondition generatedJoinCondition, long j, long j2, boolean z) {
        super(j, j2);
        this.rightType = internalTypeInfo;
        this.generatedJoinCondition = generatedJoinCondition;
        this.isLeftOuterJoin = z;
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void open() throws Exception {
        super.open();
        this.joinCondition = this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(this.joinCondition, getRuntimeContext());
        FunctionUtils.openFunction(this.joinCondition, new Configuration());
        this.rightState = getRuntimeContext().getState(new ValueStateDescriptor("right", this.rightType));
        this.collector = new TimestampedCollector<>(this.output);
        this.outRow = new JoinedRowData();
        this.rightNullRow = new GenericRowData(this.rightType.toRowSize());
        super.processWatermark2(Watermark.MAX_WATERMARK);
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        RowData rowData2 = (RowData) this.rightState.value();
        if (rowData2 == null) {
            if (this.isLeftOuterJoin) {
                collectJoinedRow(rowData, this.rightNullRow);
            }
        } else {
            if (this.joinCondition.apply(rowData, rowData2)) {
                collectJoinedRow(rowData, rowData2);
            } else if (this.isLeftOuterJoin) {
                collectJoinedRow(rowData, this.rightNullRow);
            }
            registerProcessingCleanupTimer();
        }
    }

    private void collectJoinedRow(RowData rowData, RowData rowData2) {
        this.outRow.setRowKind(rowData.getRowKind());
        this.outRow.replace(rowData, rowData2);
        this.collector.collect(this.outRow);
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        if (RowDataUtil.isAccumulateMsg((RowData) streamRecord.getValue())) {
            this.rightState.update(streamRecord.getValue());
            registerProcessingCleanupTimer();
        } else {
            this.rightState.clear();
            cleanupLastTimer();
        }
    }

    public void close() throws Exception {
        FunctionUtils.closeFunction(this.joinCondition);
        super.close();
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void cleanupState(long j) {
        this.rightState.clear();
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
    }
}
