package org.apache.flink.iteration.operator.perround;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.FlinkRuntimeException;

/* loaded from: input_file:org/apache/flink/iteration/operator/perround/MultipleInputPerRoundWrapperOperator.class */
public class MultipleInputPerRoundWrapperOperator<OUT> extends AbstractPerRoundWrapperOperator<OUT, MultipleInputStreamOperator<OUT>> implements MultipleInputStreamOperator<IterationRecord<OUT>> {
    private final int numberOfInputs;
    private final Map<Integer, List<Input>> operatorInputsByEpoch;

    /* loaded from: input_file:org/apache/flink/iteration/operator/perround/MultipleInputPerRoundWrapperOperator$ProxyInput.class */
    private class ProxyInput<IN> implements Input<IterationRecord<IN>> {
        private final int inputIndex;
        private final StreamRecord<IN> reusedInput = new StreamRecord<>((Object) null, 0);

        public ProxyInput(int i) {
            this.inputIndex = i;
        }

        public void processElement(StreamRecord<IterationRecord<IN>> streamRecord) throws Exception {
            switch (((IterationRecord) streamRecord.getValue()).getType()) {
                case RECORD:
                    MultipleInputPerRoundWrapperOperator.this.mo22getWrappedOperator(((IterationRecord) streamRecord.getValue()).getEpoch());
                    this.reusedInput.replace(((IterationRecord) streamRecord.getValue()).getValue(), streamRecord.getTimestamp());
                    MultipleInputPerRoundWrapperOperator.this.setIterationContextRound(Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()));
                    ((Input) ((List) MultipleInputPerRoundWrapperOperator.this.operatorInputsByEpoch.get(Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()))).get(this.inputIndex)).processElement(this.reusedInput);
                    MultipleInputPerRoundWrapperOperator.this.clearIterationContextRound();
                    return;
                case EPOCH_WATERMARK:
                    MultipleInputPerRoundWrapperOperator.this.onEpochWatermarkEvent(this.inputIndex, (IterationRecord) streamRecord.getValue());
                    return;
                default:
                    throw new FlinkRuntimeException("Not supported iteration record type: " + streamRecord);
            }
        }

        public void processWatermark(Watermark watermark) throws Exception {
            MultipleInputPerRoundWrapperOperator.this.processForEachWrappedOperator((num, multipleInputStreamOperator) -> {
                ((Input) ((List) MultipleInputPerRoundWrapperOperator.this.operatorInputsByEpoch.get(num)).get(this.inputIndex)).processWatermark(watermark);
            });
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            MultipleInputPerRoundWrapperOperator.this.processForEachWrappedOperator((num, multipleInputStreamOperator) -> {
                ((Input) ((List) MultipleInputPerRoundWrapperOperator.this.operatorInputsByEpoch.get(num)).get(this.inputIndex)).processWatermarkStatus(watermarkStatus);
            });
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            MultipleInputPerRoundWrapperOperator.this.reportOrForwardLatencyMarker(latencyMarker);
        }

        public void setKeyContextElement(StreamRecord<IterationRecord<IN>> streamRecord) throws Exception {
            if (((IterationRecord) streamRecord.getValue()).getType() == IterationRecord.Type.RECORD) {
                MultipleInputPerRoundWrapperOperator.this.mo22getWrappedOperator(((IterationRecord) streamRecord.getValue()).getEpoch());
                this.reusedInput.replace(streamRecord.getValue(), streamRecord.getTimestamp());
                ((Input) ((List) MultipleInputPerRoundWrapperOperator.this.operatorInputsByEpoch.get(Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()))).get(this.inputIndex)).setKeyContextElement(this.reusedInput);
            }
        }
    }

    public MultipleInputPerRoundWrapperOperator(StreamOperatorParameters<IterationRecord<OUT>> streamOperatorParameters, StreamOperatorFactory<OUT> streamOperatorFactory) {
        super(streamOperatorParameters, streamOperatorFactory);
        this.operatorInputsByEpoch = new HashMap();
        this.numberOfInputs = ((Set) this.streamConfig.getInPhysicalEdges(this.containingTask.getUserCodeClassLoader()).stream().map((v0) -> {
            return v0.getTypeNumber();
        }).collect(Collectors.toSet())).size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator
    /* renamed from: getWrappedOperator, reason: merged with bridge method [inline-methods] */
    public MultipleInputStreamOperator<OUT> mo22getWrappedOperator(int i) {
        MultipleInputStreamOperator<OUT> mo22getWrappedOperator = super.mo22getWrappedOperator(i);
        this.operatorInputsByEpoch.put(Integer.valueOf(i), mo22getWrappedOperator.getInputs());
        return mo22getWrappedOperator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator
    public void endInputAndEmitMaxWatermark(MultipleInputStreamOperator<OUT> multipleInputStreamOperator, int i, int i2) throws Exception {
        OperatorUtils.processOperatorOrUdfIfSatisfy(multipleInputStreamOperator, BoundedMultiInput.class, boundedMultiInput -> {
            for (int i3 = 0; i3 < this.numberOfInputs; i3++) {
                boundedMultiInput.endInput(i3 + 1);
            }
        });
        for (int i3 = 0; i3 < this.numberOfInputs; i3++) {
            this.operatorInputsByEpoch.get(Integer.valueOf(i)).get(i3).processWatermark(new Watermark(Long.MAX_VALUE));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator
    public void closeStreamOperator(MultipleInputStreamOperator<OUT> multipleInputStreamOperator, int i, int i2) throws Exception {
        super.closeStreamOperator((MultipleInputPerRoundWrapperOperator<OUT>) multipleInputStreamOperator, i, i2);
        this.operatorInputsByEpoch.remove(Integer.valueOf(i));
    }

    public List<Input> getInputs() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.numberOfInputs; i++) {
            arrayList.add(new ProxyInput(i));
        }
        return arrayList;
    }
}
