package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.class */
public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private final DeserializationDelegate<StreamElement> deserializationDelegate1;
    private final DeserializationDelegate<StreamElement> deserializationDelegate2;
    private final CheckpointedInputGate barrierHandler;
    private final Object lock;
    private final OperatorChain<?, ?> operatorChain;
    private StreamStatus firstStatus;
    private StreamStatus secondStatus;
    private StatusWatermarkValve statusWatermarkValve1;
    private StatusWatermarkValve statusWatermarkValve2;
    private final int numInputChannels1;
    private final int numInputChannels2;
    private int currentChannel = -1;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
    private final WatermarkGauge input1WatermarkGauge;
    private final WatermarkGauge input2WatermarkGauge;
    private Counter numRecordsIn;
    private final BitSet finishedChannels1;
    private final BitSet finishedChannels2;
    private boolean isFinished;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor$ForwardingValveOutputHandler1.class */
    private class ForwardingValveOutputHandler1 implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler1(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Object obj) {
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.input1WatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark1(watermark);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.firstStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.secondStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor$ForwardingValveOutputHandler2.class */
    private class ForwardingValveOutputHandler2 implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler2(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Object obj) {
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.input2WatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark2(watermark);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.secondStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.firstStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    public StreamTwoInputProcessor(Collection<InputGate> collection, Collection<InputGate> collection2, TypeSerializer<IN1> typeSerializer, TypeSerializer<IN2> typeSerializer2, TwoInputStreamTask<IN1, IN2, ?> twoInputStreamTask, CheckpointingMode checkpointingMode, Object obj, IOManager iOManager, Configuration configuration, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, TaskIOMetricGroup taskIOMetricGroup, WatermarkGauge watermarkGauge, WatermarkGauge watermarkGauge2, String str, OperatorChain<?, ?> operatorChain) throws IOException {
        InputGate createInputGate = InputGateUtil.createInputGate(collection, collection2);
        this.barrierHandler = InputProcessorUtil.createCheckpointedInputGate(twoInputStreamTask, checkpointingMode, iOManager, createInputGate, configuration, str);
        this.lock = Preconditions.checkNotNull(obj);
        this.deserializationDelegate1 = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
        this.deserializationDelegate2 = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer2));
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[createInputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; i++) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(iOManager.getSpillingDirectoriesPaths());
        }
        int i2 = 0;
        Iterator<InputGate> it = collection.iterator();
        while (it.hasNext()) {
            i2 += it.next().getNumberOfInputChannels();
        }
        this.numInputChannels1 = i2;
        this.numInputChannels2 = createInputGate.getNumberOfInputChannels() - i2;
        this.firstStatus = StreamStatus.ACTIVE;
        this.secondStatus = StreamStatus.ACTIVE;
        this.streamStatusMaintainer = (StreamStatusMaintainer) Preconditions.checkNotNull(streamStatusMaintainer);
        this.streamOperator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
        this.statusWatermarkValve1 = new StatusWatermarkValve(i2, new ForwardingValveOutputHandler1(twoInputStreamOperator, obj));
        this.statusWatermarkValve2 = new StatusWatermarkValve(this.numInputChannels2, new ForwardingValveOutputHandler2(twoInputStreamOperator, obj));
        this.input1WatermarkGauge = watermarkGauge;
        this.input2WatermarkGauge = watermarkGauge2;
        CheckpointedInputGate checkpointedInputGate = this.barrierHandler;
        checkpointedInputGate.getClass();
        taskIOMetricGroup.gauge("checkpointAlignmentTime", checkpointedInputGate::getAlignmentDurationNanos);
        this.operatorChain = (OperatorChain) Preconditions.checkNotNull(operatorChain);
        this.finishedChannels1 = new BitSet();
        this.finishedChannels2 = new BitSet();
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public boolean processInput() throws Exception {
        if (this.isFinished) {
            return false;
        }
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = this.streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
        while (true) {
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult nextRecord = this.currentChannel < this.numInputChannels1 ? this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate1) : this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate2);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    if (this.currentChannel < this.numInputChannels1) {
                        StreamElement streamElement = (StreamElement) this.deserializationDelegate1.getInstance();
                        if (streamElement.isWatermark()) {
                            this.statusWatermarkValve1.inputWatermark(streamElement.asWatermark(), this.currentChannel);
                        } else if (streamElement.isStreamStatus()) {
                            this.statusWatermarkValve1.inputStreamStatus(streamElement.asStreamStatus(), this.currentChannel);
                        } else {
                            if (!streamElement.isLatencyMarker()) {
                                StreamRecord<?> asRecord = streamElement.asRecord();
                                synchronized (this.lock) {
                                    this.numRecordsIn.inc();
                                    this.streamOperator.setKeyContextElement1(asRecord);
                                    this.streamOperator.processElement1(asRecord);
                                }
                                return true;
                            }
                            synchronized (this.lock) {
                                this.streamOperator.processLatencyMarker1(streamElement.asLatencyMarker());
                            }
                        }
                    } else {
                        StreamElement streamElement2 = (StreamElement) this.deserializationDelegate2.getInstance();
                        if (streamElement2.isWatermark()) {
                            this.statusWatermarkValve2.inputWatermark(streamElement2.asWatermark(), this.currentChannel - this.numInputChannels1);
                        } else if (streamElement2.isStreamStatus()) {
                            this.statusWatermarkValve2.inputStreamStatus(streamElement2.asStreamStatus(), this.currentChannel - this.numInputChannels1);
                        } else {
                            if (!streamElement2.isLatencyMarker()) {
                                StreamRecord<?> asRecord2 = streamElement2.asRecord();
                                synchronized (this.lock) {
                                    this.numRecordsIn.inc();
                                    this.streamOperator.setKeyContextElement2(asRecord2);
                                    this.streamOperator.processElement2(asRecord2);
                                }
                                return true;
                            }
                            synchronized (this.lock) {
                                this.streamOperator.processLatencyMarker2(streamElement2.asLatencyMarker());
                            }
                        }
                    }
                }
            }
            Optional<BufferOrEvent> pollNext = this.barrierHandler.pollNext();
            if (pollNext.isPresent()) {
                processBufferOrEvent(pollNext.get());
            } else {
                if (this.barrierHandler.isFinished()) {
                    this.isFinished = true;
                    if (this.barrierHandler.isEmpty()) {
                        return false;
                    }
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                this.barrierHandler.isAvailable().get();
            }
        }
    }

    private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws Exception {
        if (bufferOrEvent.isBuffer()) {
            this.currentChannel = bufferOrEvent.getChannelIndex();
            this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
            this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
        } else {
            AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
                throw new IOException("Unexpected event: " + event);
            }
            handleEndOfPartitionEvent(bufferOrEvent.getChannelIndex());
        }
    }

    private void handleEndOfPartitionEvent(int i) throws Exception {
        int i2 = -1;
        if (i < this.numInputChannels1) {
            this.finishedChannels1.set(i);
            if (this.finishedChannels1.cardinality() == this.numInputChannels1) {
                i2 = 1;
            }
        } else {
            this.finishedChannels2.set(i - this.numInputChannels1);
            if (this.finishedChannels2.cardinality() == this.numInputChannels2) {
                i2 = 2;
            }
        }
        if (i2 > 0) {
            synchronized (this.lock) {
                this.operatorChain.endInput(i2);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
        }
        this.barrierHandler.cleanup();
    }
}
