package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.class */
class OperatorStateOutputCheckpointStreamTest {
    private static final int STREAM_CAPACITY = 128;

    OperatorStateOutputCheckpointStreamTest() {
    }

    private static OperatorStateCheckpointOutputStream createStream() throws IOException {
        return new OperatorStateCheckpointOutputStream(new TestMemoryCheckpointOutputStream(128));
    }

    private OperatorStateHandle writeAllTestKeyGroups(OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream, int i) throws Exception {
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(operatorStateCheckpointOutputStream);
        for (int i2 = 0; i2 < i; i2++) {
            Assertions.assertThat(operatorStateCheckpointOutputStream.getNumberOfPartitions()).isEqualTo(i2);
            operatorStateCheckpointOutputStream.startNewPartition();
            dataOutputViewStreamWrapper.writeInt(i2);
        }
        return operatorStateCheckpointOutputStream.closeAndGetHandle();
    }

    @Test
    void testCloseNotPropagated() throws Exception {
        OperatorStateCheckpointOutputStream createStream = createStream();
        TestMemoryCheckpointOutputStream delegate = createStream.getDelegate();
        createStream.close();
        Assertions.assertThat(delegate.isClosed()).isFalse();
        delegate.close();
    }

    @Test
    void testEmptyOperatorStream() throws Exception {
        OperatorStateCheckpointOutputStream createStream = createStream();
        TestMemoryCheckpointOutputStream delegate = createStream.getDelegate();
        OperatorStateHandle closeAndGetHandle = createStream.closeAndGetHandle();
        Assertions.assertThat(delegate.isClosed()).isTrue();
        Assertions.assertThat(createStream.getNumberOfPartitions()).isZero();
        Assertions.assertThat(closeAndGetHandle).isNull();
    }

    @Test
    void testWriteReadRoundtrip() throws Exception {
        OperatorStateHandle writeAllTestKeyGroups = writeAllTestKeyGroups(createStream(), 3);
        Assertions.assertThat(writeAllTestKeyGroups).isNotNull();
        Iterator it = writeAllTestKeyGroups.getStateNameToPartitionOffsets().entrySet().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((OperatorStateHandle.StateMetaInfo) ((Map.Entry) it.next()).getValue()).getDistributionMode()).isEqualTo(OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        }
        verifyRead(writeAllTestKeyGroups, 3);
    }

    private static void verifyRead(OperatorStateHandle operatorStateHandle, int i) throws IOException {
        int i2 = 0;
        FSDataInputStream openInputStream = operatorStateHandle.openInputStream();
        try {
            long[] offsets = ((OperatorStateHandle.StateMetaInfo) operatorStateHandle.getStateNameToPartitionOffsets().get("_default_")).getOffsets();
            Assertions.assertThat(offsets).isNotNull();
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
            for (int i3 = 0; i3 < i; i3++) {
                openInputStream.seek(offsets[i3]);
                Assertions.assertThat(dataInputViewStreamWrapper.readInt()).isEqualTo(i3);
                i2++;
            }
            if (openInputStream != null) {
                openInputStream.close();
            }
            Assertions.assertThat(i2).isEqualTo(i);
        } catch (Throwable th) {
            if (openInputStream != null) {
                try {
                    openInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
