package org.apache.flink.streaming.runtime.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.class */
public final class StreamMultipleInputProcessor implements StreamInputProcessor {
    private final MultipleInputSelectionHandler inputSelectionHandler;
    private final InputProcessor<?>[] inputProcessors;
    private final OperatorChain<?, ?> operatorChain;
    private final StreamStatus[] streamStatuses;
    private final Counter numRecordsIn;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor$InputProcessor.class */
    public class InputProcessor<T> implements Closeable {
        private final StreamTaskNetworkOutput<T> dataOutput;
        private final StreamTaskNetworkInput<T> networkInput;

        public InputProcessor(StreamTaskNetworkOutput<T> streamTaskNetworkOutput, StreamTaskNetworkInput<T> streamTaskNetworkInput) {
            this.dataOutput = streamTaskNetworkOutput;
            this.networkInput = streamTaskNetworkInput;
        }

        public InputStatus processInput() throws Exception {
            return this.networkInput.emitNext(this.dataOutput);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.networkInput.close();
        }

        public CompletableFuture<?> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws IOException {
            return this.networkInput.prepareSnapshot(channelStateWriter, j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor$StreamTaskNetworkOutput.class */
    public class StreamTaskNetworkOutput<T> extends AbstractDataOutput<T> {
        private final Input<T> input;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;

        private StreamTaskNetworkOutput(Input<T> input, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge watermarkGauge, int i) {
            super(streamStatusMaintainer);
            this.input = (Input) Preconditions.checkNotNull(input);
            this.inputWatermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.inputIndex = i;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            this.input.setKeyContextElement(streamRecord);
            this.input.processElement(streamRecord);
            StreamMultipleInputProcessor.this.numRecordsIn.inc();
            StreamMultipleInputProcessor.this.inputSelectionHandler.nextSelection();
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.input.processWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.runtime.io.AbstractDataOutput, org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitStreamStatus(StreamStatus streamStatus) {
            StreamMultipleInputProcessor.this.streamStatuses[this.inputIndex] = streamStatus;
            if (streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                return;
            }
            if (streamStatus.isActive()) {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            } else if (StreamMultipleInputProcessor.this.allStreamStatusesAreIdle()) {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.input.processLatencyMarker(latencyMarker);
        }
    }

    public StreamMultipleInputProcessor(CheckpointedInputGate[] checkpointedInputGateArr, TypeSerializer<?>[] typeSerializerArr, IOManager iOManager, StreamStatusMaintainer streamStatusMaintainer, MultipleInputStreamOperator<?> multipleInputStreamOperator, MultipleInputSelectionHandler multipleInputSelectionHandler, WatermarkGauge[] watermarkGaugeArr, OperatorChain<?, ?> operatorChain, Counter counter) {
        this.inputSelectionHandler = (MultipleInputSelectionHandler) Preconditions.checkNotNull(multipleInputSelectionHandler);
        List<Input> inputs = multipleInputStreamOperator.getInputs();
        int size = inputs.size();
        this.inputProcessors = new InputProcessor[size];
        this.streamStatuses = new StreamStatus[size];
        this.numRecordsIn = counter;
        for (int i = 0; i < size; i++) {
            this.streamStatuses[i] = StreamStatus.ACTIVE;
            StreamTaskNetworkOutput streamTaskNetworkOutput = new StreamTaskNetworkOutput(inputs.get(i), streamStatusMaintainer, watermarkGaugeArr[i], i);
            this.inputProcessors[i] = new InputProcessor<>(streamTaskNetworkOutput, new StreamTaskNetworkInput(checkpointedInputGateArr[i], typeSerializerArr[i], iOManager, new StatusWatermarkValve(checkpointedInputGateArr[i].getNumberOfInputChannels(), streamTaskNetworkOutput), i));
        }
        this.operatorChain = (OperatorChain) Preconditions.checkNotNull(operatorChain);
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.isAnyInputAvailable() || this.inputSelectionHandler.areAllInputsFinished()) {
            return AVAILABLE;
        }
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        for (int i = 0; i < this.inputProcessors.length; i++) {
            if (!this.inputSelectionHandler.isInputFinished(i) && this.inputSelectionHandler.isInputSelected(i)) {
                ((InputProcessor) this.inputProcessors[i]).networkInput.getAvailableFuture().thenRun(() -> {
                    completableFuture.complete(null);
                });
            }
        }
        return completableFuture;
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public InputStatus processInput() throws Exception {
        int selectNextReadingInputIndex = this.isPrepared ? selectNextReadingInputIndex() : selectFirstReadingInputIndex();
        if (selectNextReadingInputIndex == -1) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.lastReadInputIndex = selectNextReadingInputIndex;
        InputStatus processInput = this.inputProcessors[selectNextReadingInputIndex].processInput();
        checkFinished(processInput, selectNextReadingInputIndex);
        return this.inputSelectionHandler.updateStatus(processInput, selectNextReadingInputIndex);
    }

    private int selectFirstReadingInputIndex() {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return selectNextReadingInputIndex();
    }

    private void checkFinished(InputStatus inputStatus, int i) throws Exception {
        if (inputStatus == InputStatus.END_OF_INPUT) {
            this.operatorChain.endHeadOperatorInput(getInputId(i));
            this.inputSelectionHandler.nextSelection();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        for (InputProcessor<?> inputProcessor : this.inputProcessors) {
            try {
                inputProcessor.close();
            } catch (IOException e) {
                iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, iOException);
            }
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws IOException {
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.inputProcessors.length];
        for (int i = 0; i < completableFutureArr.length; i++) {
            completableFutureArr[i] = this.inputProcessors[i].prepareSnapshot(channelStateWriter, j);
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    private int selectNextReadingInputIndex() {
        if (!this.inputSelectionHandler.isAnyInputAvailable()) {
            fullCheckAndSetAvailable();
        }
        int selectNextInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex);
        if (selectNextInputIndex == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            fullCheckAndSetAvailable();
        }
        return selectNextInputIndex;
    }

    private void fullCheckAndSetAvailable() {
        for (int i = 0; i < this.inputProcessors.length; i++) {
            InputProcessor<?> inputProcessor = this.inputProcessors[i];
            if (((InputProcessor) inputProcessor).networkInput.isApproximatelyAvailable() || ((InputProcessor) inputProcessor).networkInput.isAvailable()) {
                this.inputSelectionHandler.setAvailableInput(i);
            }
        }
    }

    private int getInputId(int i) {
        return i + 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean allStreamStatusesAreIdle() {
        for (StreamStatus streamStatus : this.streamStatuses) {
            if (streamStatus.isActive()) {
                return false;
            }
        }
        return true;
    }
}
