package org.apache.flink.runtime.checkpoint.metadata;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.FinishedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.class */
public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements MetadataSerializer {
    public static final int VERSION = 3;
    public static final MetadataV3Serializer INSTANCE = new MetadataV3Serializer();
    private final ChannelStateHandleSerializer channelStateHandleSerializer = new ChannelStateHandleSerializer();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer$SubtaskAndFinishedState.class */
    public static class SubtaskAndFinishedState {
        final int subtaskIndex;
        final boolean isFinished;

        public SubtaskAndFinishedState(int i, boolean z) {
            this.subtaskIndex = i;
            this.isFinished = z;
        }
    }

    public int getVersion() {
        return 3;
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer
    public void serialize(CheckpointMetadata checkpointMetadata, DataOutputStream dataOutputStream) throws IOException {
        serializeMetadata(checkpointMetadata, dataOutputStream);
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer
    public CheckpointMetadata deserialize(DataInputStream dataInputStream, ClassLoader classLoader, String str) throws IOException {
        return deserializeMetadata(dataInputStream, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    public void serializeOperatorState(OperatorState operatorState, DataOutputStream dataOutputStream) throws IOException {
        dataOutputStream.writeLong(operatorState.getOperatorID().getLowerPart());
        dataOutputStream.writeLong(operatorState.getOperatorID().getUpperPart());
        dataOutputStream.writeInt(operatorState.getParallelism());
        dataOutputStream.writeInt(operatorState.getMaxParallelism());
        serializeStreamStateHandle(operatorState.getCoordinatorState(), dataOutputStream);
        if (operatorState.isFullyFinished()) {
            dataOutputStream.writeInt(-1);
            return;
        }
        Map<Integer, OperatorSubtaskState> subtaskStates = operatorState.getSubtaskStates();
        dataOutputStream.writeInt(subtaskStates.size());
        for (Map.Entry<Integer, OperatorSubtaskState> entry : subtaskStates.entrySet()) {
            boolean isFinished = entry.getValue().isFinished();
            serializeSubtaskIndexAndFinishedState(entry.getKey().intValue(), isFinished, dataOutputStream);
            if (!isFinished) {
                serializeSubtaskState(entry.getValue(), dataOutputStream);
            }
        }
    }

    private void serializeSubtaskIndexAndFinishedState(int i, boolean z, DataOutputStream dataOutputStream) throws IOException {
        if (z) {
            dataOutputStream.writeInt(-(i + 1));
        } else {
            dataOutputStream.writeInt(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    public void serializeSubtaskState(OperatorSubtaskState operatorSubtaskState, DataOutputStream dataOutputStream) throws IOException {
        super.serializeSubtaskState(operatorSubtaskState, dataOutputStream);
        serializeCollection(operatorSubtaskState.getInputChannelState(), dataOutputStream, this::serializeInputChannelStateHandle);
        serializeCollection(operatorSubtaskState.getResultSubpartitionState(), dataOutputStream, this::serializeResultSubpartitionStateHandle);
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    protected OperatorState deserializeOperatorState(DataInputStream dataInputStream, @Nullable MetadataV2V3SerializerBase.DeserializationContext deserializationContext) throws IOException {
        OperatorID operatorID = new OperatorID(dataInputStream.readLong(), dataInputStream.readLong());
        int readInt = dataInputStream.readInt();
        int readInt2 = dataInputStream.readInt();
        ByteStreamStateHandle deserializeAndCheckByteStreamStateHandle = deserializeAndCheckByteStreamStateHandle(dataInputStream, deserializationContext);
        int readInt3 = dataInputStream.readInt();
        if (readInt3 < 0) {
            Preconditions.checkState(deserializeAndCheckByteStreamStateHandle == null, "Coordinator State should be null for fully finished operator state");
            return new FullyFinishedOperatorState(null, null, operatorID, readInt, readInt2);
        }
        OperatorState operatorState = new OperatorState(null, null, operatorID, readInt, readInt2);
        operatorState.setCoordinatorState(deserializeAndCheckByteStreamStateHandle);
        for (int i = 0; i < readInt3; i++) {
            SubtaskAndFinishedState deserializeSubtaskIndexAndFinishedState = deserializeSubtaskIndexAndFinishedState(dataInputStream);
            if (deserializeSubtaskIndexAndFinishedState.isFinished) {
                operatorState.putState(deserializeSubtaskIndexAndFinishedState.subtaskIndex, FinishedOperatorSubtaskState.INSTANCE);
            } else {
                operatorState.putState(deserializeSubtaskIndexAndFinishedState.subtaskIndex, deserializeSubtaskState(dataInputStream, deserializationContext));
            }
        }
        return operatorState;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SubtaskAndFinishedState deserializeSubtaskIndexAndFinishedState(DataInputStream dataInputStream) throws IOException {
        int readInt = dataInputStream.readInt();
        return readInt < 0 ? new SubtaskAndFinishedState((-readInt) - 1, true) : new SubtaskAndFinishedState(readInt, false);
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    @VisibleForTesting
    public void serializeResultSubpartitionStateHandle(ResultSubpartitionStateHandle resultSubpartitionStateHandle, DataOutputStream dataOutputStream) throws IOException {
        this.channelStateHandleSerializer.serialize(resultSubpartitionStateHandle, dataOutputStream);
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    @VisibleForTesting
    public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dataInputStream, @Nullable MetadataV2V3SerializerBase.DeserializationContext deserializationContext) throws IOException {
        ChannelStateHandleSerializer channelStateHandleSerializer = this.channelStateHandleSerializer;
        Objects.requireNonNull(channelStateHandleSerializer);
        return deserializeCollection(dataInputStream, deserializationContext, channelStateHandleSerializer::deserializeResultSubpartitionStateHandle);
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    @VisibleForTesting
    public void serializeInputChannelStateHandle(InputChannelStateHandle inputChannelStateHandle, DataOutputStream dataOutputStream) throws IOException {
        this.channelStateHandleSerializer.serialize(inputChannelStateHandle, dataOutputStream);
    }

    @Override // org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
    @VisibleForTesting
    public StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dataInputStream, @Nullable MetadataV2V3SerializerBase.DeserializationContext deserializationContext) throws IOException {
        ChannelStateHandleSerializer channelStateHandleSerializer = this.channelStateHandleSerializer;
        Objects.requireNonNull(channelStateHandleSerializer);
        return deserializeCollection(dataInputStream, deserializationContext, channelStateHandleSerializer::deserializeInputChannelStateHandle);
    }

    private <T extends StateObject> void serializeCollection(StateObjectCollection<T> stateObjectCollection, DataOutputStream dataOutputStream, BiConsumerWithException<T, DataOutputStream, IOException> biConsumerWithException) throws IOException {
        if (stateObjectCollection == null) {
            dataOutputStream.writeInt(0);
            return;
        }
        dataOutputStream.writeInt(stateObjectCollection.size());
        Iterator<T> it = stateObjectCollection.iterator();
        while (it.hasNext()) {
            biConsumerWithException.accept(it.next(), dataOutputStream);
        }
    }

    @VisibleForTesting
    public static void serializeStreamStateHandle(StreamStateHandle streamStateHandle, DataOutputStream dataOutputStream) throws IOException {
        MetadataV2V3SerializerBase.serializeStreamStateHandle(streamStateHandle, dataOutputStream);
    }

    @VisibleForTesting
    public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dataInputStream) throws IOException {
        return MetadataV2V3SerializerBase.deserializeStreamStateHandle(dataInputStream, null);
    }

    @VisibleForTesting
    public static void serializeOperatorStateHandleUtil(OperatorStateHandle operatorStateHandle, DataOutputStream dataOutputStream) throws IOException {
        INSTANCE.serializeOperatorStateHandle(operatorStateHandle, dataOutputStream);
    }

    @VisibleForTesting
    public static OperatorStateHandle deserializeOperatorStateHandleUtil(DataInputStream dataInputStream) throws IOException {
        return INSTANCE.deserializeOperatorStateHandle(dataInputStream, null);
    }

    @VisibleForTesting
    public static void serializeKeyedStateHandleUtil(KeyedStateHandle keyedStateHandle, DataOutputStream dataOutputStream) throws IOException {
        MetadataV3Serializer metadataV3Serializer = INSTANCE;
        serializeKeyedStateHandle(keyedStateHandle, dataOutputStream);
    }

    @VisibleForTesting
    public static KeyedStateHandle deserializeKeyedStateHandleUtil(DataInputStream dataInputStream) throws IOException {
        MetadataV3Serializer metadataV3Serializer = INSTANCE;
        return deserializeKeyedStateHandle(dataInputStream, null);
    }

    @VisibleForTesting
    public static StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dataInputStream) throws IOException {
        return INSTANCE.deserializeInputChannelStateHandle(dataInputStream, null);
    }

    @VisibleForTesting
    public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dataInputStream) throws IOException {
        return INSTANCE.deserializeResultSubpartitionStateHandle(dataInputStream, null);
    }
}
