/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.RecordAttributesCombiner;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public abstract class AbstractStreamTaskNetworkInput<T, R extends RecordDeserializer<DeserializationDelegate<StreamElement>>>
implements StreamTaskInput<T> {
    protected final CheckpointedInputGate checkpointedInputGate;
    protected final DeserializationDelegate<StreamElement> deserializationDelegate;
    protected final TypeSerializer<T> inputSerializer;
    protected final Map<InputChannelInfo, R> recordDeserializers;
    protected final Map<InputChannelInfo, Integer> flattenedChannelIndices = new HashMap<InputChannelInfo, Integer>();
    protected final StatusWatermarkValve statusWatermarkValve;
    protected final int inputIndex;
    private final RecordAttributesCombiner recordAttributesCombiner;
    private InputChannelInfo lastChannel = null;
    private R currentRecordDeserializer = null;
    protected final Map<String, WatermarkCombiner> watermarkCombiners = new HashMap<String, WatermarkCombiner>();
    protected final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;

    public AbstractStreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<T> inputSerializer, StatusWatermarkValve statusWatermarkValve, int inputIndex, Map<InputChannelInfo, R> recordDeserializers, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
        this(checkpointedInputGate, inputSerializer, statusWatermarkValve, inputIndex, recordDeserializers, canEmitBatchOfRecords, Collections.emptySet());
    }

    public AbstractStreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<T> inputSerializer, StatusWatermarkValve statusWatermarkValve, int inputIndex, Map<InputChannelInfo, R> recordDeserializers, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, Set<AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationSet) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(new StreamElementSerializer<T>(inputSerializer));
        this.inputSerializer = inputSerializer;
        for (InputChannelInfo inputChannelInfo : checkpointedInputGate.getChannelInfos()) {
            this.flattenedChannelIndices.put(inputChannelInfo, this.flattenedChannelIndices.size());
        }
        this.statusWatermarkValve = (StatusWatermarkValve)Preconditions.checkNotNull((Object)statusWatermarkValve);
        this.inputIndex = inputIndex;
        this.recordDeserializers = (Map)Preconditions.checkNotNull(recordDeserializers);
        this.canEmitBatchOfRecords = (StreamTask.CanEmitBatchOfRecordsChecker)Preconditions.checkNotNull((Object)canEmitBatchOfRecords);
        this.recordAttributesCombiner = new RecordAttributesCombiner(checkpointedInputGate.getNumberOfInputChannels());
        WatermarkUtils.addEventTimeWatermarkCombinerIfNeeded(watermarkDeclarationSet, this.watermarkCombiners, this.flattenedChannelIndices.size());
        for (AbstractInternalWatermarkDeclaration abstractInternalWatermarkDeclaration : watermarkDeclarationSet) {
            if (this.watermarkCombiners.containsKey(abstractInternalWatermarkDeclaration.getIdentifier())) continue;
            this.watermarkCombiners.put(abstractInternalWatermarkDeclaration.getIdentifier(), abstractInternalWatermarkDeclaration.createWatermarkCombiner(this.flattenedChannelIndices.size(), () -> {
                try {
                    checkpointedInputGate.resumeGateConsumption();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
    }

    @Override
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        block8: {
            DataInputStatus status;
            while (true) {
                Optional<BufferOrEvent> bufferOrEvent;
                if (this.currentRecordDeserializer != null) {
                    RecordDeserializer.DeserializationResult result;
                    try {
                        result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                    }
                    catch (IOException e) {
                        throw new IOException(String.format("Can't get next record for channel %s", this.lastChannel), e);
                    }
                    if (result.isBufferConsumed()) {
                        this.currentRecordDeserializer = null;
                    }
                    if (result.isFullRecord()) {
                        boolean breakBatchEmitting = this.processElement(this.deserializationDelegate.getInstance(), output);
                        if (this.canEmitBatchOfRecords.check() && !breakBatchEmitting) continue;
                        return DataInputStatus.MORE_AVAILABLE;
                    }
                }
                if (!(bufferOrEvent = this.checkpointedInputGate.pollNext()).isPresent()) break block8;
                if (bufferOrEvent.get().isBuffer()) {
                    this.processBuffer(bufferOrEvent.get());
                    continue;
                }
                status = this.processEvent(bufferOrEvent.get(), output);
                if (status != DataInputStatus.MORE_AVAILABLE || !this.canEmitBatchOfRecords.check()) break;
            }
            return status;
        }
        if (this.checkpointedInputGate.isFinished()) {
            Preconditions.checkState((boolean)this.checkpointedInputGate.getAvailableFuture().isDone(), (Object)"Finished BarrierHandler should be available");
            return DataInputStatus.END_OF_INPUT;
        }
        return DataInputStatus.NOTHING_AVAILABLE;
    }

    private boolean processElement(StreamElement streamElement, PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (streamElement.isRecord()) {
            output.emitRecord(streamElement.asRecord());
            return false;
        }
        if (streamElement.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(streamElement.asWatermark(), this.flattenedChannelIndices.get(this.lastChannel), output);
            return false;
        }
        if (streamElement.isLatencyMarker()) {
            output.emitLatencyMarker(streamElement.asLatencyMarker());
            return false;
        }
        if (streamElement.isWatermarkStatus()) {
            this.statusWatermarkValve.inputWatermarkStatus(streamElement.asWatermarkStatus(), this.flattenedChannelIndices.get(this.lastChannel), output);
            return false;
        }
        if (streamElement.isRecordAttributes()) {
            this.recordAttributesCombiner.inputRecordAttributes(streamElement.asRecordAttributes(), this.flattenedChannelIndices.get(this.lastChannel), output);
            return true;
        }
        throw new UnsupportedOperationException("Unknown type of StreamElement");
    }

    private void processWatermarkEvent(InputChannelInfo inputChannelInfo, WatermarkEvent generalizedWatermarkEvent, PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        Watermark watermark = generalizedWatermarkEvent.getWatermark();
        WatermarkCombiner combiner = this.watermarkCombiners.get(watermark.getIdentifier());
        combiner.combineWatermark(watermark, inputChannelInfo.getInputChannelIdx(), outputWatermark -> {
            try {
                output.emitWatermark(new WatermarkEvent((Watermark)outputWatermark, generalizedWatermarkEvent.isAligned()));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected DataInputStatus processEvent(BufferOrEvent bufferOrEvent, PushingAsyncDataInput.DataOutput<T> output) {
        AbstractEvent event = bufferOrEvent.getEvent();
        if (event.getClass() == EndOfData.class) {
            switch (this.checkpointedInputGate.hasReceivedEndOfData()) {
                case NOT_END_OF_DATA: {
                    break;
                }
                case DRAINED: {
                    return DataInputStatus.END_OF_DATA;
                }
                case STOPPED: {
                    return DataInputStatus.STOPPED;
                }
            }
        } else if (event.getClass() == EndOfPartitionEvent.class) {
            this.releaseDeserializer(bufferOrEvent.getChannelInfo());
            if (this.checkpointedInputGate.isFinished()) {
                return DataInputStatus.END_OF_INPUT;
            }
        } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
            if (this.checkpointedInputGate.allChannelsRecovered()) {
                return DataInputStatus.END_OF_RECOVERY;
            }
        } else if (event.getClass() == WatermarkEvent.class) {
            try {
                this.processWatermarkEvent(bufferOrEvent.getChannelInfo(), (WatermarkEvent)event, output);
            }
            catch (Exception e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        }
        return DataInputStatus.MORE_AVAILABLE;
    }

    protected void processBuffer(BufferOrEvent bufferOrEvent) throws IOException {
        this.lastChannel = bufferOrEvent.getChannelInfo();
        Preconditions.checkState((this.lastChannel != null ? 1 : 0) != 0);
        this.currentRecordDeserializer = this.getActiveSerializer(bufferOrEvent.getChannelInfo());
        Preconditions.checkState((this.currentRecordDeserializer != null ? 1 : 0) != 0, (Object)"currentRecordDeserializer has already been released");
        this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
    }

    protected R getActiveSerializer(InputChannelInfo channelInfo) {
        return (R)((RecordDeserializer)this.recordDeserializers.get(channelInfo));
    }

    @Override
    public int getInputIndex() {
        return this.inputIndex;
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        if (this.currentRecordDeserializer != null) {
            return AVAILABLE;
        }
        return this.checkpointedInputGate.getAvailableFuture();
    }

    @Override
    public void close() throws IOException {
        for (InputChannelInfo channelInfo : new ArrayList<InputChannelInfo>(this.recordDeserializers.keySet())) {
            this.releaseDeserializer(channelInfo);
        }
    }

    protected void releaseDeserializer(InputChannelInfo channelInfo) {
        RecordDeserializer deserializer = (RecordDeserializer)this.recordDeserializers.get(channelInfo);
        if (deserializer != null) {
            deserializer.clear();
            this.recordDeserializers.remove(channelInfo);
        }
    }
}

