/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.TestMemoryCheckpointOutputStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class KeyedStateCheckpointOutputStreamTest {
    private static final int STREAM_CAPACITY = 128;

    KeyedStateCheckpointOutputStreamTest() {
    }

    private static KeyedStateCheckpointOutputStream createStream(KeyGroupRange keyGroupRange) {
        TestMemoryCheckpointOutputStream checkStream = new TestMemoryCheckpointOutputStream(128);
        return new KeyedStateCheckpointOutputStream((CheckpointStateOutputStream)checkStream, keyGroupRange);
    }

    private KeyGroupsStateHandle writeAllTestKeyGroups(KeyedStateCheckpointOutputStream stream, KeyGroupRange keyRange) throws Exception {
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        Iterator iterator = keyRange.iterator();
        while (iterator.hasNext()) {
            int kg = (Integer)iterator.next();
            stream.startNewKeyGroup(kg);
            dov.writeInt(kg);
        }
        return stream.closeAndGetHandle();
    }

    @Test
    void testCloseNotPropagated() throws Exception {
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(new KeyGroupRange(0, 0));
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        stream.close();
        Assertions.assertThat((boolean)innerStream.isClosed()).isFalse();
    }

    @Test
    void testEmptyKeyedStream() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        KeyGroupsStateHandle emptyHandle = stream.closeAndGetHandle();
        Assertions.assertThat((boolean)innerStream.isClosed()).isTrue();
        Assertions.assertThat((Object)emptyHandle).isNull();
    }

    @Test
    void testWriteReadRoundtrip() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        KeyGroupsStateHandle fullHandle = this.writeAllTestKeyGroups(stream, keyRange);
        Assertions.assertThat((Object)fullHandle).isNotNull();
        KeyedStateCheckpointOutputStreamTest.verifyRead(fullHandle, keyRange);
    }

    @Test
    void testWriteKeyGroupTracking() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        Assertions.assertThatThrownBy(() -> stream.startNewKeyGroup(4711)).isInstanceOf(IllegalArgumentException.class);
        Assertions.assertThat((int)stream.getCurrentKeyGroup()).isEqualTo(-1);
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        int previous = -1;
        Iterator iterator = keyRange.iterator();
        while (iterator.hasNext()) {
            int kg = (Integer)iterator.next();
            Assertions.assertThat((boolean)stream.isKeyGroupAlreadyStarted(kg)).isFalse();
            Assertions.assertThat((boolean)stream.isKeyGroupAlreadyFinished(kg)).isFalse();
            stream.startNewKeyGroup(kg);
            if (-1 != previous) {
                Assertions.assertThat((boolean)stream.isKeyGroupAlreadyStarted(previous)).isTrue();
                Assertions.assertThat((boolean)stream.isKeyGroupAlreadyFinished(previous)).isTrue();
            }
            Assertions.assertThat((boolean)stream.isKeyGroupAlreadyStarted(kg)).isTrue();
            Assertions.assertThat((boolean)stream.isKeyGroupAlreadyFinished(kg)).isFalse();
            dov.writeInt(kg);
            previous = kg;
        }
        KeyGroupsStateHandle fullHandle = stream.closeAndGetHandle();
        KeyedStateCheckpointOutputStreamTest.verifyRead(fullHandle, keyRange);
        Iterator iterator2 = keyRange.iterator();
        while (iterator2.hasNext()) {
            int kg = (Integer)iterator2.next();
            Assertions.assertThatThrownBy(() -> stream.startNewKeyGroup(kg)).isInstanceOf(IOException.class);
        }
    }

    @Test
    void testReadWriteMissingKeyGroups() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        stream.startNewKeyGroup(1);
        dov.writeInt(1);
        KeyGroupsStateHandle fullHandle = stream.closeAndGetHandle();
        int count = 0;
        try (FSDataInputStream in = fullHandle.openInputStream();){
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            Iterator iterator = fullHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int kg = (Integer)iterator.next();
                long off = fullHandle.getOffsetForKeyGroup(kg);
                if (off < 0L) continue;
                in.seek(off);
                Assertions.assertThat((int)div.readInt()).isOne();
                ++count;
            }
        }
        Assertions.assertThat((int)count).isOne();
    }

    private static void verifyRead(KeyGroupsStateHandle fullHandle, KeyGroupRange keyRange) throws IOException {
        int count = 0;
        try (FSDataInputStream in = fullHandle.openInputStream();){
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            Iterator iterator = fullHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int kg = (Integer)iterator.next();
                long off = fullHandle.getOffsetForKeyGroup(kg);
                in.seek(off);
                Assertions.assertThat((int)div.readInt()).isEqualTo(kg);
                ++count;
            }
        }
        Assertions.assertThat((int)count).isEqualTo(keyRange.getNumberOfKeyGroups());
    }
}

