package org.apache.flink.table.runtime.operators.join.window.utils;

import java.time.ZoneId;
import java.util.IdentityHashMap;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.function.BiConsumerWithException;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.class */
public abstract class WindowJoinHelper {
    private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = "leftNumLateRecordsDropped";
    private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "leftLateRecordsDroppedRate";
    private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = "rightNumLateRecordsDropped";
    private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "rightLateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final ZoneId shiftTimeZone;
    private final WindowTimerService<Long> windowTimerService;
    protected final RowDataSerializer leftSerializer;
    protected final RowDataSerializer rightSerializer;
    protected final JoinConditionWithNullFilters joinCondition;
    protected final TimestampedCollector<RowData> collector;
    private final WindowJoinProcessor windowJoinProcessor;
    private Meter leftLateRecordsDroppedRate;
    private Meter rightLateRecordsDroppedRate;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$AbstractOuterWindowJoinProcessor.class */
    private abstract class AbstractOuterWindowJoinProcessor implements WindowJoinProcessor {
        private final RowData leftNullRow;
        private final RowData rightNullRow;
        private final JoinedRowData outRow = new JoinedRowData();

        private AbstractOuterWindowJoinProcessor() {
            this.leftNullRow = new GenericRowData(WindowJoinHelper.this.leftSerializer.getArity());
            this.rightNullRow = new GenericRowData(WindowJoinHelper.this.rightSerializer.getArity());
        }

        protected void outputNullPadding(RowData rowData, boolean z) {
            if (z) {
                this.outRow.replace(rowData, this.rightNullRow);
            } else {
                this.outRow.replace(this.leftNullRow, rowData);
            }
            this.outRow.setRowKind(RowKind.INSERT);
            WindowJoinHelper.this.collector.collect(this.outRow);
        }

        protected void outputNullPadding(Iterable<RowData> iterable, boolean z) {
            Iterator<RowData> it = iterable.iterator();
            while (it.hasNext()) {
                outputNullPadding(it.next(), z);
            }
        }

        protected void output(RowData rowData, RowData rowData2, boolean z) {
            if (z) {
                this.outRow.replace(rowData, rowData2);
            } else {
                this.outRow.replace(rowData2, rowData);
            }
            this.outRow.setRowKind(RowKind.INSERT);
            WindowJoinHelper.this.collector.collect(this.outRow);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$FullOuterWindowJoinProcessor.class */
    public class FullOuterWindowJoinProcessor extends AbstractOuterWindowJoinProcessor {
        private FullOuterWindowJoinProcessor() {
            super();
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper.WindowJoinProcessor
        public void doJoin(@Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2) {
            if (iterable == null && iterable2 == null) {
                return;
            }
            if (iterable2 == null) {
                outputNullPadding(iterable, true);
                return;
            }
            if (iterable == null) {
                outputNullPadding(iterable2, false);
                return;
            }
            IdentityHashMap identityHashMap = new IdentityHashMap();
            for (RowData rowData : iterable) {
                boolean z = false;
                for (RowData rowData2 : iterable2) {
                    if (WindowJoinHelper.this.joinCondition.apply(rowData, rowData2)) {
                        output(rowData, rowData2, true);
                        z = true;
                        identityHashMap.put(rowData2, Boolean.TRUE);
                    }
                }
                if (!z) {
                    outputNullPadding(rowData, true);
                }
            }
            for (RowData rowData3 : iterable2) {
                if (!identityHashMap.containsKey(rowData3)) {
                    outputNullPadding(rowData3, false);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$InnerWindowJoinProcessor.class */
    public class InnerWindowJoinProcessor implements WindowJoinProcessor {
        private final JoinedRowData outRow = new JoinedRowData();

        private InnerWindowJoinProcessor() {
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper.WindowJoinProcessor
        public void doJoin(@Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2) {
            if (iterable == null || iterable2 == null) {
                return;
            }
            for (RowData rowData : iterable) {
                for (RowData rowData2 : iterable2) {
                    if (WindowJoinHelper.this.joinCondition.apply(rowData, rowData2)) {
                        this.outRow.setRowKind(RowKind.INSERT);
                        this.outRow.replace(rowData, rowData2);
                        WindowJoinHelper.this.collector.collect(this.outRow);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$LeftOuterWindowJoinProcessor.class */
    public class LeftOuterWindowJoinProcessor extends AbstractOuterWindowJoinProcessor {
        private LeftOuterWindowJoinProcessor() {
            super();
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper.WindowJoinProcessor
        public void doJoin(@Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2) {
            if (iterable == null) {
                return;
            }
            if (iterable2 == null) {
                outputNullPadding(iterable, true);
                return;
            }
            for (RowData rowData : iterable) {
                boolean z = false;
                for (RowData rowData2 : iterable2) {
                    if (WindowJoinHelper.this.joinCondition.apply(rowData, rowData2)) {
                        output(rowData, rowData2, true);
                        z = true;
                    }
                }
                if (!z) {
                    outputNullPadding(rowData, true);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$RightOuterWindowJoinProcessor.class */
    public class RightOuterWindowJoinProcessor extends AbstractOuterWindowJoinProcessor {
        private RightOuterWindowJoinProcessor() {
            super();
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper.WindowJoinProcessor
        public void doJoin(@Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2) {
            if (iterable2 == null) {
                return;
            }
            if (iterable == null) {
                outputNullPadding(iterable2, false);
                return;
            }
            for (RowData rowData : iterable2) {
                boolean z = false;
                for (RowData rowData2 : iterable) {
                    if (WindowJoinHelper.this.joinCondition.apply(rowData2, rowData)) {
                        output(rowData2, rowData, true);
                        z = true;
                    }
                }
                if (!z) {
                    outputNullPadding(rowData, false);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$SemiAntiWindowJoinProcessor.class */
    public class SemiAntiWindowJoinProcessor implements WindowJoinProcessor {
        private final boolean isAntiJoin;

        public SemiAntiWindowJoinProcessor(boolean z) {
            this.isAntiJoin = z;
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper.WindowJoinProcessor
        public void doJoin(@Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2) {
            if (iterable == null) {
                return;
            }
            if (iterable2 == null) {
                if (this.isAntiJoin) {
                    Iterator<RowData> it = iterable.iterator();
                    while (it.hasNext()) {
                        WindowJoinHelper.this.collector.collect(it.next());
                    }
                    return;
                }
                return;
            }
            for (RowData rowData : iterable) {
                boolean z = false;
                Iterator<RowData> it2 = iterable2.iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    if (WindowJoinHelper.this.joinCondition.apply(rowData, it2.next())) {
                        z = true;
                        break;
                    }
                }
                if (z) {
                    if (!this.isAntiJoin) {
                        WindowJoinHelper.this.collector.collect(rowData);
                    }
                } else if (this.isAntiJoin) {
                    WindowJoinHelper.this.collector.collect(rowData);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper$WindowJoinProcessor.class */
    private interface WindowJoinProcessor {
        void doJoin(@Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2);
    }

    public WindowJoinHelper(RowDataSerializer rowDataSerializer, RowDataSerializer rowDataSerializer2, ZoneId zoneId, WindowTimerService<Long> windowTimerService, JoinConditionWithNullFilters joinConditionWithNullFilters, TimestampedCollector<RowData> timestampedCollector, FlinkJoinType flinkJoinType) {
        this.leftSerializer = rowDataSerializer;
        this.rightSerializer = rowDataSerializer2;
        this.shiftTimeZone = zoneId;
        this.windowTimerService = windowTimerService;
        this.joinCondition = joinConditionWithNullFilters;
        this.collector = timestampedCollector;
        this.windowJoinProcessor = getWindowJoinProcessor(flinkJoinType);
    }

    public void registerMetric(OperatorMetricGroup operatorMetricGroup) {
        this.leftLateRecordsDroppedRate = operatorMetricGroup.meter(LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(operatorMetricGroup.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME)));
        this.rightLateRecordsDroppedRate = operatorMetricGroup.meter(RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(operatorMetricGroup.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME)));
        operatorMetricGroup.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.windowTimerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(this.windowTimerService.currentProcessingTime() - currentWatermark);
        });
    }

    public void processElement(StreamRecord<RowData> streamRecord, int i, Meter meter, BiConsumerWithException<Long, RowData, Exception> biConsumerWithException) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        long j = rowData.getLong(i);
        if (TimeWindowUtil.isWindowFired(j, this.windowTimerService.currentWatermark(), this.shiftTimeZone)) {
            meter.markEvent();
        } else {
            if (!RowDataUtil.isAccumulateMsg(rowData)) {
                throw new UnsupportedOperationException("This is a bug and should not happen. Please file an issue.");
            }
            biConsumerWithException.accept(Long.valueOf(j), rowData);
            this.windowTimerService.registerEventTimeWindowTimer(Long.valueOf(j));
        }
    }

    public void joinAndClear(long j, @Nullable Iterable<RowData> iterable, @Nullable Iterable<RowData> iterable2) throws Exception {
        this.windowJoinProcessor.doJoin(iterable, iterable2);
        if (iterable != null) {
            clearState(j, true);
        }
        if (iterable2 != null) {
            clearState(j, false);
        }
    }

    public Meter getLeftLateRecordsDroppedRate() {
        return this.leftLateRecordsDroppedRate;
    }

    public Meter getRightLateRecordsDroppedRate() {
        return this.rightLateRecordsDroppedRate;
    }

    public abstract void clearState(long j, boolean z) throws Exception;

    private WindowJoinProcessor getWindowJoinProcessor(FlinkJoinType flinkJoinType) {
        switch (flinkJoinType) {
            case INNER:
                return new InnerWindowJoinProcessor();
            case SEMI:
                return new SemiAntiWindowJoinProcessor(false);
            case ANTI:
                return new SemiAntiWindowJoinProcessor(true);
            case LEFT:
                return new LeftOuterWindowJoinProcessor();
            case RIGHT:
                return new RightOuterWindowJoinProcessor();
            case FULL:
                return new FullOuterWindowJoinProcessor();
            default:
                throw new IllegalArgumentException("Invalid join type: " + flinkJoinType);
        }
    }
}
