/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.AbstractMergedChannelStateHandle;
import org.apache.flink.runtime.state.ChannelState;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.InputStateHandle;
import org.apache.flink.runtime.state.MergedInputChannelStateHandle;
import org.apache.flink.runtime.state.MergedResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.OutputStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;

public final class ChannelStateHelper {
    public static final BiConsumerWithException<ResultSubpartitionInfo, DataOutputStream, IOException> RESULT_SUBPARTITION_INFO_WRITER = (info, out) -> {
        out.writeInt(info.getPartitionIdx());
        out.writeInt(info.getSubPartitionIdx());
    };
    public static final BiConsumerWithException<InputChannelInfo, DataOutputStream, IOException> INPUT_CHANNEL_INFO_WRITER = (info, dos) -> {
        dos.writeInt(info.getGateIdx());
        dos.writeInt(info.getInputChannelIdx());
    };
    public static final FunctionWithException<DataInputStream, ResultSubpartitionInfo, IOException> RESULT_SUBPARTITION_INFO_READER = is -> new ResultSubpartitionInfo(is.readInt(), is.readInt());
    public static final FunctionWithException<DataInputStream, InputChannelInfo, IOException> INPUT_CHANNEL_INFO_READER = is -> new InputChannelInfo(is.readInt(), is.readInt());

    private ChannelStateHelper() {
    }

    public static Stream<StreamStateHandle> collectUniqueDisposableInChannelState(Stream<StateObjectCollection<? extends ChannelState>> stateCollections) {
        return stateCollections.flatMap(Collection::stream).map(handle -> {
            if (handle instanceof AbstractChannelStateHandle) {
                return ((AbstractChannelStateHandle)((Object)handle)).getDelegate();
            }
            if (handle instanceof AbstractMergedChannelStateHandle) {
                return ((AbstractMergedChannelStateHandle)((Object)handle)).getDelegate();
            }
            throw new IllegalStateException("Not Supported state handle : " + handle.getClass());
        }).distinct();
    }

    public static StateObjectCollection<InputStateHandle> castToInputStateCollection(Collection<InputChannelStateHandle> handles) {
        if (handles == null || handles.isEmpty()) {
            return StateObjectCollection.empty();
        }
        return new StateObjectCollection<InputStateHandle>(handles.stream().map(e -> e).collect(Collectors.toList()));
    }

    public static StateObjectCollection<OutputStateHandle> castToOutputStateCollection(Collection<ResultSubpartitionStateHandle> handles) {
        if (handles == null || handles.isEmpty()) {
            return StateObjectCollection.empty();
        }
        return new StateObjectCollection<OutputStateHandle>(handles.stream().map(e -> e).collect(Collectors.toList()));
    }

    public static StateObjectCollection<InputStateHandle> mergeInputStateCollection(Collection<InputChannelStateHandle> handles) {
        if (handles == null || handles.isEmpty()) {
            return StateObjectCollection.empty();
        }
        Collection inputStateHandles = handles.stream().collect(Collectors.groupingBy(AbstractChannelStateHandle::getDelegate)).values().stream().map(MergedInputChannelStateHandle::fromChannelHandles).map(e -> e).collect(Collectors.toSet());
        return new StateObjectCollection<InputStateHandle>(inputStateHandles);
    }

    public static StateObjectCollection<OutputStateHandle> mergeOutputStateCollection(Collection<ResultSubpartitionStateHandle> handles) {
        if (handles == null || handles.isEmpty()) {
            return StateObjectCollection.empty();
        }
        Collection outputStateHandles = handles.stream().collect(Collectors.groupingBy(AbstractChannelStateHandle::getDelegate)).values().stream().map(MergedResultSubpartitionStateHandle::fromChannelHandles).map(e -> e).collect(Collectors.toSet());
        return new StateObjectCollection<OutputStateHandle>(outputStateHandles);
    }

    public static StateObjectCollection<InputChannelStateHandle> extractUnmergedInputHandles(OperatorSubtaskState subtaskState) {
        List inputHandles = subtaskState.getInputChannelState().stream().flatMap(h -> {
            if (h instanceof InputChannelStateHandle) {
                return Stream.of((InputChannelStateHandle)h);
            }
            if (h instanceof MergedInputChannelStateHandle) {
                return ((MergedInputChannelStateHandle)h).getUnmergedHandles().stream();
            }
            throw new IllegalStateException("Invalid input channel state : " + h.getClass());
        }).collect(Collectors.toList());
        return inputHandles.isEmpty() ? StateObjectCollection.empty() : new StateObjectCollection(inputHandles);
    }

    public static StateObjectCollection<ResultSubpartitionStateHandle> extractUnmergedOutputHandles(OperatorSubtaskState subtaskState) {
        List outputHandles = subtaskState.getResultSubpartitionState().stream().flatMap(h -> {
            if (h instanceof ResultSubpartitionStateHandle) {
                return Stream.of((ResultSubpartitionStateHandle)h);
            }
            if (h instanceof MergedResultSubpartitionStateHandle) {
                return ((MergedResultSubpartitionStateHandle)h).getUnmergedHandles().stream();
            }
            throw new IllegalStateException("Invalid output channel state : " + h.getClass());
        }).collect(Collectors.toList());
        return outputHandles.isEmpty() ? StateObjectCollection.empty() : new StateObjectCollection(outputHandles);
    }
}

