package org.apache.flink.datastream.impl.extension.join.operators;

import java.util.Iterator;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.datastream.api.extension.join.JoinType;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.class */
public class TwoInputNonBroadcastJoinProcessOperator<K, IN1, IN2, OUT> extends KeyedTwoInputNonBroadcastProcessOperator<K, IN1, IN2, OUT> {
    private final TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT> joinProcessFunction;
    private final ListStateDescriptor<IN1> leftStateDescriptor;
    private final ListStateDescriptor<IN2> rightStateDescriptor;
    private transient ListState<IN1> leftState;
    private transient ListState<IN2> rightState;

    public TwoInputNonBroadcastJoinProcessOperator(TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> twoInputNonBroadcastStreamProcessFunction, ListStateDescriptor<IN1> listStateDescriptor, ListStateDescriptor<IN2> listStateDescriptor2) {
        super(twoInputNonBroadcastStreamProcessFunction);
        this.joinProcessFunction = (TwoInputNonBroadcastJoinProcessFunction) twoInputNonBroadcastStreamProcessFunction;
        Preconditions.checkArgument(this.joinProcessFunction.getJoinType() == JoinType.INNER, "Currently only support INNER join.");
        this.leftStateDescriptor = listStateDescriptor;
        this.rightStateDescriptor = listStateDescriptor2;
    }

    @Override // org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator, org.apache.flink.datastream.impl.operators.BaseKeyedTwoInputNonBroadcastProcessOperator, org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator
    public void open() throws Exception {
        super.open();
        this.leftState = getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.leftStateDescriptor);
        this.rightState = getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.rightStateDescriptor);
    }

    @Override // org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator
    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        this.collector.setTimestampFromStreamRecord(streamRecord);
        Object value = streamRecord.getValue();
        Iterable iterable = (Iterable) this.rightState.get();
        if (iterable != null) {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                this.joinProcessFunction.getJoinFunction().processRecord(value, it.next(), this.collector, this.partitionedContext);
            }
        }
        this.leftState.add(value);
    }

    @Override // org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator
    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        this.collector.setTimestampFromStreamRecord(streamRecord);
        Iterable iterable = (Iterable) this.leftState.get();
        Object value = streamRecord.getValue();
        if (iterable != null) {
            Iterator it = ((Iterable) this.leftState.get()).iterator();
            while (it.hasNext()) {
                this.joinProcessFunction.getJoinFunction().processRecord(it.next(), value, this.collector, this.partitionedContext);
            }
        }
        this.rightState.add(value);
    }
}
