package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.stream.IntStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.class */
public class ChannelStateCheckpointWriterTest {
    private static final RunnableWithException NO_OP_RUNNABLE = () -> {
    };
    private final Random random = new Random();

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testFileHandleSize() throws Exception {
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult, new FsCheckpointStreamFactory(LocalFileSystem.getSharedInstance(), Path.fromLocalFile(this.temporaryFolder.newFolder("checkpointsDir")), Path.fromLocalFile(this.temporaryFolder.newFolder("sharedStateDir")), 5 - 1, 5 - 1).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
        InputChannelInfo[] inputChannelInfoArr = (InputChannelInfo[]) IntStream.range(0, 3).mapToObj(i -> {
            return new InputChannelInfo(0, i);
        }).toArray(i2 -> {
            return new InputChannelInfo[i2];
        });
        for (int i3 = 0; i3 < 4; i3++) {
            for (int i4 = 0; i4 < 3; i4++) {
                write(createWriter, inputChannelInfoArr[i4], getData(5));
            }
        }
        createWriter.completeInput();
        createWriter.completeOutput();
        Iterator it = ((Collection) channelStateWriteResult.inputChannelStateHandles.get()).iterator();
        while (it.hasNext()) {
            Assert.assertEquals((4 + 5) * 4, ((InputChannelStateHandle) it.next()).getStateSize());
        }
    }

    @Test
    public void testSmallFilesNotWritten() throws Exception {
        FsCheckpointStreamFactory fsCheckpointStreamFactory = new FsCheckpointStreamFactory(LocalFileSystem.getSharedInstance(), Path.fromLocalFile(this.temporaryFolder.newFolder("checkpointsDir")), Path.fromLocalFile(this.temporaryFolder.newFolder("sharedStateDir")), 100, 100);
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult, fsCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
        createWriter.writeInput(new InputChannelInfo(1, 2), new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(100 / 2), FreeingBufferRecycler.INSTANCE));
        createWriter.completeOutput();
        createWriter.completeInput();
        Assert.assertTrue(channelStateWriteResult.isDone());
        Assert.assertEquals(0L, r0.list().length);
        Assert.assertEquals(0L, r0.list().length);
    }

    @Test
    public void testEmptyState() throws Exception {
        MemCheckpointStreamFactory.MemoryCheckpointOutputStream memoryCheckpointOutputStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(1000) { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriterTest.1
            public StreamStateHandle closeAndGetHandle() {
                Assert.fail("closeAndGetHandle shouldn't be called for empty channel state");
                return null;
            }
        };
        ChannelStateCheckpointWriter createWriter = createWriter(new ChannelStateWriter.ChannelStateWriteResult(), memoryCheckpointOutputStream);
        createWriter.completeOutput();
        createWriter.completeInput();
        Assert.assertTrue(memoryCheckpointOutputStream.isClosed());
    }

    @Test
    public void testRecyclingBuffers() throws Exception {
        ChannelStateCheckpointWriter createWriter = createWriter(new ChannelStateWriter.ChannelStateWriteResult());
        NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(10, (Object) null), FreeingBufferRecycler.INSTANCE);
        createWriter.writeInput(new InputChannelInfo(1, 2), networkBuffer);
        Assert.assertTrue(networkBuffer.isRecycled());
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [java.io.DataOutputStream, org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriterTest$1FlushRecorder] */
    @Test
    public void testFlush() throws Exception {
        ?? r0 = new DataOutputStream() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriterTest.1FlushRecorder
            private boolean flushed;

            {
                new ByteArrayOutputStream();
                this.flushed = false;
            }

            @Override // java.io.DataOutputStream, java.io.FilterOutputStream, java.io.OutputStream, java.io.Flushable
            public void flush() throws IOException {
                this.flushed = true;
                super.flush();
            }
        };
        ChannelStateCheckpointWriter channelStateCheckpointWriter = new ChannelStateCheckpointWriter("dummy task", 0, 1L, new ChannelStateWriter.ChannelStateWriteResult(), new ChannelStateSerializerImpl(), NO_OP_RUNNABLE, new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(42), (DataOutputStream) r0);
        channelStateCheckpointWriter.completeInput();
        channelStateCheckpointWriter.completeOutput();
        Assert.assertTrue(((C1FlushRecorder) r0).flushed);
    }

    @Test
    public void testResultCompletion() throws Exception {
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult);
        createWriter.completeInput();
        Assert.assertFalse(channelStateWriteResult.isDone());
        createWriter.completeOutput();
        Assert.assertTrue(channelStateWriteResult.isDone());
    }

    @Test
    public void testRecordingOffsets() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new InputChannelInfo(1, 1), 1);
        hashMap.put(new InputChannelInfo(1, 2), 2);
        hashMap.put(new InputChannelInfo(1, 3), 5);
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult);
        for (Map.Entry entry : hashMap.entrySet()) {
            for (int i = 0; i < ((Integer) entry.getValue()).intValue(); i++) {
                write(createWriter, (InputChannelInfo) entry.getKey(), getData(100));
            }
        }
        createWriter.completeInput();
        createWriter.completeOutput();
        for (InputChannelStateHandle inputChannelStateHandle : (Collection) channelStateWriteResult.inputChannelStateHandles.get()) {
            Assert.assertEquals(Collections.singletonList(Long.valueOf(4)), inputChannelStateHandle.getOffsets());
            Assert.assertEquals(4 + 4 + (100 * ((Integer) hashMap.remove(inputChannelStateHandle.getInfo())).intValue()), inputChannelStateHandle.getDelegate().getStateSize());
        }
        Assert.assertTrue(hashMap.isEmpty());
    }

    private byte[] getData(int i) {
        byte[] bArr = new byte[i];
        this.random.nextBytes(bArr);
        return bArr;
    }

    private void write(ChannelStateCheckpointWriter channelStateCheckpointWriter, InputChannelInfo inputChannelInfo, byte[] bArr) throws Exception {
        MemorySegment wrap = MemorySegmentFactory.wrap(bArr);
        channelStateCheckpointWriter.writeInput(inputChannelInfo, new NetworkBuffer(wrap, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, wrap.size()));
    }

    private ChannelStateCheckpointWriter createWriter(ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult) throws Exception {
        return createWriter(channelStateWriteResult, new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(1000));
    }

    private ChannelStateCheckpointWriter createWriter(ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStateOutputStream checkpointStateOutputStream) throws Exception {
        return new ChannelStateCheckpointWriter("dummy task", 0, 1L, channelStateWriteResult, checkpointStateOutputStream, new ChannelStateSerializerImpl(), NO_OP_RUNNABLE);
    }
}
