package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.shaded.io.airlift.compress.zstd.Huffman;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.class */
public final class CommittableCollectorSerializer<CommT> implements SimpleVersionedSerializer<CommittableCollector<CommT>> {
    private static final int MAGIC_NUMBER = -1189141204;
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final int owningSubtaskId;
    private final int owningNumberOfSubtasks;
    private final SinkCommitterMetricGroup metricGroup;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer$CheckpointSimpleVersionedSerializer.class */
    public class CheckpointSimpleVersionedSerializer implements SimpleVersionedSerializer<CheckpointCommittableManagerImpl<CommT>> {
        private CheckpointSimpleVersionedSerializer() {
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(CheckpointCommittableManagerImpl<CommT> checkpointCommittableManagerImpl) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(Huffman.MAX_SYMBOL_COUNT);
            dataOutputSerializer.writeLong(checkpointCommittableManagerImpl.getCheckpointId());
            dataOutputSerializer.writeInt(checkpointCommittableManagerImpl.getNumberOfSubtasks());
            SimpleVersionedSerialization.writeVersionAndSerializeList(new SubtaskSimpleVersionedSerializer(), new ArrayList(checkpointCommittableManagerImpl.getSubtaskCommittableManagers()), dataOutputSerializer);
            return dataOutputSerializer.getCopyOfBuffer();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public CheckpointCommittableManagerImpl<CommT> m1021deserialize(int i, byte[] bArr) throws IOException {
            DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(bArr);
            long readLong = dataInputDeserializer.readLong();
            int readInt = i == 0 ? CommittableCollectorSerializer.this.owningNumberOfSubtasks : dataInputDeserializer.readInt();
            List<SubtaskCommittableManager> readVersionAndDeserializeList = SimpleVersionedSerialization.readVersionAndDeserializeList(new SubtaskSimpleVersionedSerializer(readLong), dataInputDeserializer);
            HashMap newHashMapWithExpectedSize = CollectionUtil.newHashMapWithExpectedSize(readVersionAndDeserializeList.size());
            for (SubtaskCommittableManager subtaskCommittableManager : readVersionAndDeserializeList) {
                newHashMapWithExpectedSize.merge(Integer.valueOf(subtaskCommittableManager.getSubtaskId()), subtaskCommittableManager, (v0, v1) -> {
                    return v0.merge(v1);
                });
            }
            return new CheckpointCommittableManagerImpl<>(newHashMapWithExpectedSize, readInt, readLong, CommittableCollectorSerializer.this.metricGroup);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer$SubtaskSimpleVersionedSerializer.class */
    public class SubtaskSimpleVersionedSerializer implements SimpleVersionedSerializer<SubtaskCommittableManager<CommT>> {

        @Nullable
        private final Long checkpointId;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer$SubtaskSimpleVersionedSerializer$RequestSimpleVersionedSerializer.class */
        public class RequestSimpleVersionedSerializer implements SimpleVersionedSerializer<CommitRequestImpl<CommT>> {
            private RequestSimpleVersionedSerializer() {
            }

            public int getVersion() {
                return 0;
            }

            public byte[] serialize(CommitRequestImpl<CommT> commitRequestImpl) throws IOException {
                DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(Huffman.MAX_SYMBOL_COUNT);
                SimpleVersionedSerialization.writeVersionAndSerialize(CommittableCollectorSerializer.this.committableSerializer, commitRequestImpl.getCommittable(), dataOutputSerializer);
                dataOutputSerializer.writeInt(commitRequestImpl.getNumberOfRetries());
                dataOutputSerializer.writeInt(commitRequestImpl.getState().ordinal());
                return dataOutputSerializer.getCopyOfBuffer();
            }

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public CommitRequestImpl<CommT> m1023deserialize(int i, byte[] bArr) throws IOException {
                DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(bArr);
                return new CommitRequestImpl<>(SimpleVersionedSerialization.readVersionAndDeSerialize(CommittableCollectorSerializer.this.committableSerializer, dataInputDeserializer), dataInputDeserializer.readInt(), CommitRequestState.values()[dataInputDeserializer.readInt()], CommittableCollectorSerializer.this.metricGroup);
            }
        }

        public SubtaskSimpleVersionedSerializer(long j) {
            this.checkpointId = Long.valueOf(j);
        }

        public SubtaskSimpleVersionedSerializer() {
            this.checkpointId = null;
        }

        public int getVersion() {
            return 2;
        }

        public byte[] serialize(SubtaskCommittableManager<CommT> subtaskCommittableManager) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(Huffman.MAX_SYMBOL_COUNT);
            dataOutputSerializer.writeInt(subtaskCommittableManager.getSubtaskId());
            SimpleVersionedSerialization.writeVersionAndSerializeList(new RequestSimpleVersionedSerializer(), new ArrayList(subtaskCommittableManager.getRequests()), dataOutputSerializer);
            dataOutputSerializer.writeInt(subtaskCommittableManager.getNumCommittables());
            dataOutputSerializer.writeInt(subtaskCommittableManager.getNumFailed());
            return dataOutputSerializer.getCopyOfBuffer();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public SubtaskCommittableManager<CommT> m1022deserialize(int i, byte[] bArr) throws IOException {
            DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(bArr);
            return new SubtaskCommittableManager<>(SimpleVersionedSerialization.readVersionAndDeserializeList(new RequestSimpleVersionedSerializer(), dataInputDeserializer), dataInputDeserializer.readInt(), i >= 2 ? 0 : dataInputDeserializer.readInt(), dataInputDeserializer.readInt(), i == 0 ? CommittableCollectorSerializer.this.owningSubtaskId : dataInputDeserializer.readInt(), ((Long) Preconditions.checkNotNull(this.checkpointId, "CheckpointId must be set to align the SubtaskCommittableManager with holding CheckpointCommittableManager.")).longValue(), CommittableCollectorSerializer.this.metricGroup);
        }
    }

    public CommittableCollectorSerializer(SimpleVersionedSerializer<CommT> simpleVersionedSerializer, int i, int i2, SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        this.committableSerializer = (SimpleVersionedSerializer) Preconditions.checkNotNull(simpleVersionedSerializer);
        this.owningSubtaskId = i;
        this.owningNumberOfSubtasks = i2;
        this.metricGroup = sinkCommitterMetricGroup;
    }

    public int getVersion() {
        return 2;
    }

    public byte[] serialize(CommittableCollector<CommT> committableCollector) throws IOException {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(Huffman.MAX_SYMBOL_COUNT);
        dataOutputSerializer.writeInt(-1189141204);
        serializeV2(committableCollector, dataOutputSerializer);
        return dataOutputSerializer.getCopyOfBuffer();
    }

    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
    public CommittableCollector<CommT> m1020deserialize(int i, byte[] bArr) throws IOException {
        DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(bArr);
        if (i == 1) {
            return deserializeV1(dataInputDeserializer);
        }
        if (i != 2) {
            throw new IOException("Unrecognized version or corrupt state: " + i);
        }
        validateMagicNumber(dataInputDeserializer);
        return deserializeV2(dataInputDeserializer);
    }

    private CommittableCollector<CommT> deserializeV1(DataInputView dataInputView) throws IOException {
        return CommittableCollector.ofLegacy(SinkV1CommittableDeserializer.readVersionAndDeserializeList(this.committableSerializer, dataInputView), this.metricGroup);
    }

    private void serializeV2(CommittableCollector<CommT> committableCollector, DataOutputView dataOutputView) throws IOException {
        SimpleVersionedSerialization.writeVersionAndSerializeList(new CheckpointSimpleVersionedSerializer(), new ArrayList(committableCollector.getCheckpointCommittables()), dataOutputView);
    }

    private CommittableCollector<CommT> deserializeV2(DataInputDeserializer dataInputDeserializer) throws IOException {
        return new CommittableCollector<>((Map) SimpleVersionedSerialization.readVersionAndDeserializeList(new CheckpointSimpleVersionedSerializer(), dataInputDeserializer).stream().collect(Collectors.toMap((v0) -> {
            return v0.getCheckpointId();
        }, checkpointCommittableManagerImpl -> {
            return checkpointCommittableManagerImpl;
        })), this.metricGroup);
    }

    private static void validateMagicNumber(DataInputView dataInputView) throws IOException {
        int readInt = dataInputView.readInt();
        if (readInt != -1189141204) {
            throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", Integer.valueOf(readInt)));
        }
    }
}
