/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateStreamReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RefCountingFSDataInputStream;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
@Internal
public class ChannelStateReaderImpl
implements ChannelStateReader {
    private static final Logger log = LoggerFactory.getLogger(ChannelStateReaderImpl.class);
    private final Map<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReaders;
    private final Map<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReaders;
    private boolean isClosed = false;

    public ChannelStateReaderImpl(TaskStateSnapshot snapshot) {
        this(snapshot, new ChannelStateSerializerImpl());
    }

    ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateSerializer serializer) {
        RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory streamFactory = new RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory(serializer);
        HashMap<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReadersTmp = new HashMap<InputChannelInfo, ChannelStateStreamReader>();
        HashMap<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReadersTmp = new HashMap<ResultSubpartitionInfo, ChannelStateStreamReader>();
        for (Map.Entry<OperatorID, OperatorSubtaskState> e : snapshot.getSubtaskStateMappings()) {
            this.addReaders(inputChannelHandleReadersTmp, e.getValue().getInputChannelState(), streamFactory);
            this.addReaders(resultSubpartitionHandleReadersTmp, e.getValue().getResultSubpartitionState(), streamFactory);
        }
        this.inputChannelHandleReaders = inputChannelHandleReadersTmp;
        this.resultSubpartitionHandleReaders = resultSubpartitionHandleReadersTmp;
    }

    private <T> void addReaders(Map<T, ChannelStateStreamReader> readerMap, Collection<? extends AbstractChannelStateHandle<T>> handles, RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory streamFactory) {
        for (AbstractChannelStateHandle<T> handle : handles) {
            Preconditions.checkState((!readerMap.containsKey(handle.getInfo()) ? 1 : 0) != 0, (Object)("multiple states exist for channel: " + handle.getInfo()));
            readerMap.put(handle.getInfo(), new ChannelStateStreamReader(handle, streamFactory));
        }
    }

    @Override
    public boolean hasChannelStates() {
        return !this.inputChannelHandleReaders.isEmpty() || !this.resultSubpartitionHandleReaders.isEmpty();
    }

    @Override
    public ChannelStateReader.ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (Object)"reader is closed");
        log.debug("readInputData, resultSubpartitionInfo: {} , buffer {}", (Object)info, (Object)buffer);
        ChannelStateStreamReader reader = this.inputChannelHandleReaders.get(info);
        return reader == null ? ChannelStateReader.ReadResult.NO_MORE_DATA : reader.readInto(buffer);
    }

    @Override
    public ChannelStateReader.ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException {
        Preconditions.checkState((!this.isClosed ? 1 : 0) != 0, (Object)"reader is closed");
        log.debug("readOutputData, resultSubpartitionInfo: {} , bufferBuilder {}", (Object)info, (Object)bufferBuilder);
        ChannelStateStreamReader reader = this.resultSubpartitionHandleReaders.get(info);
        return reader == null ? ChannelStateReader.ReadResult.NO_MORE_DATA : reader.readInto(bufferBuilder);
    }

    @Override
    public void close() throws Exception {
        this.isClosed = true;
        try (Closer closer = Closer.create();){
            for (Map map : Arrays.asList(this.inputChannelHandleReaders, this.resultSubpartitionHandleReaders)) {
                for (ChannelStateStreamReader reader : map.values()) {
                    closer.register((Closeable)reader);
                }
                map.clear();
            }
        }
    }
}

