/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.enumerator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.source.utils.SerdeUtils;
import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.kafka.common.TopicPartition;

@Internal
public class KafkaSourceEnumStateSerializer
implements SimpleVersionedSerializer<KafkaSourceEnumState> {
    private static final int VERSION_0 = 0;
    private static final int VERSION_1 = 1;
    private static final int VERSION_2 = 2;
    private static final int CURRENT_VERSION = 2;

    public int getVersion() {
        return 2;
    }

    public byte[] serialize(KafkaSourceEnumState enumState) throws IOException {
        Set<TopicPartitionAndAssignmentStatus> partitions = enumState.partitions();
        boolean initialDiscoveryFinished = enumState.initialDiscoveryFinished();
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            Object object;
            try (DataOutputStream out = new DataOutputStream(baos);){
                out.writeInt(partitions.size());
                for (TopicPartitionAndAssignmentStatus topicPartitionAndAssignmentStatus : partitions) {
                    out.writeUTF(topicPartitionAndAssignmentStatus.topicPartition().topic());
                    out.writeInt(topicPartitionAndAssignmentStatus.topicPartition().partition());
                    out.writeInt(topicPartitionAndAssignmentStatus.assignmentStatus().getStatusCode());
                }
                out.writeBoolean(initialDiscoveryFinished);
                out.flush();
                object = baos.toByteArray();
            }
            return object;
        }
    }

    public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws IOException {
        switch (version) {
            case 2: {
                return KafkaSourceEnumStateSerializer.deserializeTopicPartitionAndAssignmentStatus(serialized);
            }
            case 1: {
                return KafkaSourceEnumStateSerializer.deserializeAssignedTopicPartitions(serialized);
            }
            case 0: {
                Map currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments((byte[])serialized, (SimpleVersionedSerializer)new KafkaPartitionSplitSerializer(), HashSet::new);
                HashSet<TopicPartition> currentAssignedSplits = new HashSet<TopicPartition>();
                currentPartitionAssignment.forEach((reader, splits) -> splits.forEach(split -> currentAssignedSplits.add(split.getTopicPartition())));
                return new KafkaSourceEnumState(currentAssignedSplits, new HashSet<TopicPartition>(), true);
            }
        }
        throw new IOException(String.format("The bytes are serialized with version %d, while this deserializer only supports version up to %d", version, 2));
    }

    private static KafkaSourceEnumState deserializeAssignedTopicPartitions(byte[] serializedTopicPartitions) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions);){
            KafkaSourceEnumState kafkaSourceEnumState;
            try (DataInputStream in = new DataInputStream(bais);){
                int numPartitions = in.readInt();
                HashSet<TopicPartitionAndAssignmentStatus> partitions = new HashSet<TopicPartitionAndAssignmentStatus>(numPartitions);
                for (int i = 0; i < numPartitions; ++i) {
                    String topic = in.readUTF();
                    int partition = in.readInt();
                    partitions.add(new TopicPartitionAndAssignmentStatus(new TopicPartition(topic, partition), AssignmentStatus.ASSIGNED));
                }
                if (in.available() > 0) {
                    throw new IOException("Unexpected trailing bytes in serialized topic partitions");
                }
                kafkaSourceEnumState = new KafkaSourceEnumState(partitions, true);
            }
            return kafkaSourceEnumState;
        }
    }

    private static KafkaSourceEnumState deserializeTopicPartitionAndAssignmentStatus(byte[] serialized) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);){
            KafkaSourceEnumState kafkaSourceEnumState;
            try (DataInputStream in = new DataInputStream(bais);){
                int numPartitions = in.readInt();
                HashSet<TopicPartitionAndAssignmentStatus> partitions = new HashSet<TopicPartitionAndAssignmentStatus>(numPartitions);
                for (int i = 0; i < numPartitions; ++i) {
                    String topic = in.readUTF();
                    int partition = in.readInt();
                    int statusCode = in.readInt();
                    partitions.add(new TopicPartitionAndAssignmentStatus(new TopicPartition(topic, partition), AssignmentStatus.ofStatusCode(statusCode)));
                }
                boolean initialDiscoveryFinished = in.readBoolean();
                if (in.available() > 0) {
                    throw new IOException("Unexpected trailing bytes in serialized topic partitions");
                }
                kafkaSourceEnumState = new KafkaSourceEnumState(partitions, initialDiscoveryFinished);
            }
            return kafkaSourceEnumState;
        }
    }

    @VisibleForTesting
    public static byte[] serializeTopicPartitions(Collection<TopicPartition> topicPartitions) throws IOException {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            Object object;
            try (DataOutputStream out = new DataOutputStream(baos);){
                out.writeInt(topicPartitions.size());
                for (TopicPartition tp : topicPartitions) {
                    out.writeUTF(tp.topic());
                    out.writeInt(tp.partition());
                }
                out.flush();
                object = baos.toByteArray();
            }
            return object;
        }
    }
}

