/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.GlobalCommittableWrapper;
import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterSerializer;
import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class GlobalCommitterSerializerTest {
    private static final int SUBTASK_ID = 0;
    private static final int NUMBER_OF_SUBTASKS = 1;
    private static final SinkCommitterMetricGroup METRIC_GROUP = MetricsGroupTestUtils.mockCommitterMetricGroup();
    private static final CommittableCollectorSerializer<Integer> COMMITTABLE_COLLECTOR_SERIALIZER = new CommittableCollectorSerializer((SimpleVersionedSerializer)new IntegerSerializer(), 0, 1, METRIC_GROUP);
    private static final GlobalCommitterSerializer<Integer, String> SERIALIZER = new GlobalCommitterSerializer(COMMITTABLE_COLLECTOR_SERIALIZER, (SimpleVersionedSerializer)new StringSerializer(), METRIC_GROUP);

    GlobalCommitterSerializerTest() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testSerDe(boolean withSinkV1State) throws IOException {
        GlobalCommitterSerializer serializer = new GlobalCommitterSerializer(COMMITTABLE_COLLECTOR_SERIALIZER, (SimpleVersionedSerializer)(withSinkV1State ? new StringSerializer() : null), METRIC_GROUP);
        CommittableCollector collector = new CommittableCollector(METRIC_GROUP);
        collector.addMessage((CommittableMessage)new CommittableSummary(2, 3, 1L, 1, 1, 0));
        collector.addMessage((CommittableMessage)new CommittableWithLineage((Object)1, 1L, 2));
        List v1State = withSinkV1State ? Arrays.asList("first", "second") : Collections.emptyList();
        GlobalCommittableWrapper wrapper = new GlobalCommittableWrapper(collector, v1State);
        GlobalCommittableWrapper copy = serializer.deserialize(2, serializer.serialize(wrapper));
        Assertions.assertThat((Collection)copy.getGlobalCommittables()).containsExactlyInAnyOrderElementsOf(v1State);
        ((ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)collector.getCheckpointCommittablesUpTo(2L)).singleElement()).returns((Object)1L, CheckpointCommittableManager::getCheckpointId)).returns((Object)3, CheckpointCommittableManager::getNumberOfSubtasks);
    }

    @Test
    void testDeserializationV1() throws IOException {
        DataOutputSerializer out = new DataOutputSerializer(256);
        StringSerializer stringSerializer = new StringSerializer();
        out.writeInt(-1189141204);
        out.writeInt(1);
        out.writeInt(2);
        String state1 = "legacy1";
        out.writeInt(stringSerializer.serialize(state1).length);
        out.writeUTF(state1);
        String state2 = "legacy2";
        out.writeInt(stringSerializer.serialize(state2).length);
        out.writeUTF(state2);
        byte[] serialized = out.getCopyOfBuffer();
        GlobalCommittableWrapper wrapper = SERIALIZER.deserialize(1, serialized);
        Assertions.assertThat((Collection)wrapper.getGlobalCommittables()).containsExactlyInAnyOrder((Object[])new String[]{state1, state2});
        CommittableCollector collector = wrapper.getCommittableCollector();
        Assertions.assertThat((Collection)collector.getCheckpointCommittablesUpTo(Long.MAX_VALUE)).isEmpty();
    }

    private static class StringSerializer
    implements SimpleVersionedSerializer<String> {
        private StringSerializer() {
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(String obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeUTF(obj);
            return out.getCopyOfBuffer();
        }

        public String deserialize(int version, byte[] serialized) throws IOException {
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            return in.readUTF();
        }
    }
}

