package org.apache.flink.table.runtime.operators.join.window;

import java.time.ZoneId;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
import org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowListState;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.class */
public class WindowJoinOperator extends TableStreamOperator<RowData> implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, Long>, KeyContext {
    private static final long serialVersionUID = 1;
    private static final String LEFT_RECORDS_STATE_NAME = "left-records";
    private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
    private final RowDataSerializer leftSerializer;
    private final RowDataSerializer rightSerializer;
    private final GeneratedJoinCondition generatedJoinCondition;
    private final int leftWindowEndIndex;
    private final int rightWindowEndIndex;
    private final boolean[] filterNullKeys;
    private final ZoneId shiftTimeZone;
    private final FlinkJoinType joinType;
    private transient WindowTimerService<Long> windowTimerService;
    private transient JoinConditionWithNullFilters joinCondition;
    private transient TimestampedCollector<RowData> collector;
    private transient WindowListState<Long> leftWindowState;
    private transient WindowListState<Long> rightWindowState;
    private transient WindowJoinHelper helper;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$SyncStateWindowJoinHelper.class */
    private class SyncStateWindowJoinHelper extends WindowJoinHelper {
        public SyncStateWindowJoinHelper() {
            super(WindowJoinOperator.this.leftSerializer, WindowJoinOperator.this.rightSerializer, WindowJoinOperator.this.shiftTimeZone, WindowJoinOperator.this.windowTimerService, WindowJoinOperator.this.joinCondition, WindowJoinOperator.this.collector, WindowJoinOperator.this.joinType);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper
        public void clearState(long j, boolean z) {
            if (z) {
                WindowJoinOperator.this.leftWindowState.clear(Long.valueOf(j));
            } else {
                WindowJoinOperator.this.rightWindowState.clear(Long.valueOf(j));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId, FlinkJoinType flinkJoinType) {
        this.leftSerializer = (RowDataSerializer) typeSerializer;
        this.rightSerializer = (RowDataSerializer) typeSerializer2;
        this.generatedJoinCondition = generatedJoinCondition;
        this.leftWindowEndIndex = i;
        this.rightWindowEndIndex = i2;
        this.filterNullKeys = zArr;
        this.shiftTimeZone = zoneId;
        this.joinType = flinkJoinType;
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        this.windowTimerService = new SlicingWindowTimerServiceImpl(getInternalTimerService("window-timers", longSerializer, this), this.shiftTimeZone);
        this.joinCondition = new JoinConditionWithNullFilters(this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()), this.filterNullKeys, this);
        this.joinCondition.setRuntimeContext(getRuntimeContext());
        this.joinCondition.open(DefaultOpenContext.INSTANCE);
        this.leftWindowState = new WindowListState<>((ListState) getOrCreateKeyedState(longSerializer, new ListStateDescriptor(LEFT_RECORDS_STATE_NAME, this.leftSerializer)));
        this.rightWindowState = new WindowListState<>((ListState) getOrCreateKeyedState(longSerializer, new ListStateDescriptor(RIGHT_RECORDS_STATE_NAME, this.rightSerializer)));
        this.helper = new SyncStateWindowJoinHelper();
        this.helper.registerMetric(getRuntimeContext().getMetricGroup());
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        if (this.joinCondition != null) {
            this.joinCondition.close();
        }
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        this.helper.processElement(streamRecord, this.leftWindowEndIndex, this.helper.getLeftLateRecordsDroppedRate(), (l, rowData) -> {
            this.leftWindowState.add(l, rowData);
        });
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        this.helper.processElement(streamRecord, this.rightWindowEndIndex, this.helper.getRightLateRecordsDroppedRate(), (l, rowData) -> {
            this.rightWindowState.add(l, rowData);
        });
    }

    public void onProcessingTime(InternalTimer<RowData, Long> internalTimer) throws Exception {
        throw new UnsupportedOperationException("This is a bug and should not happen. Please file an issue.");
    }

    public void onEventTime(InternalTimer<RowData, Long> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        Long l = (Long) internalTimer.getNamespace();
        this.helper.joinAndClear(l.longValue(), this.leftWindowState.get(l), this.rightWindowState.get(l));
    }
}
