/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.stream.IntStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ChannelStateCheckpointWriterTest {
    private static final RunnableWithException NO_OP_RUNNABLE = () -> {};
    private final Random random = new Random();
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testFileHandleSize() throws Exception {
        int numChannels = 3;
        int numWritesPerChannel = 4;
        int numBytesPerWrite = 5;
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter writer = this.createWriter(result, (CheckpointStreamFactory.CheckpointStateOutputStream)new FsCheckpointStreamFactory((FileSystem)LocalFileSystem.getSharedInstance(), Path.fromLocalFile((File)this.temporaryFolder.newFolder("checkpointsDir")), Path.fromLocalFile((File)this.temporaryFolder.newFolder("sharedStateDir")), numBytesPerWrite - 1, numBytesPerWrite - 1).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
        InputChannelInfo[] channels = (InputChannelInfo[])IntStream.range(0, numChannels).mapToObj(i -> new InputChannelInfo(0, i)).toArray(InputChannelInfo[]::new);
        for (int call = 0; call < numWritesPerChannel; ++call) {
            for (int channel = 0; channel < numChannels; ++channel) {
                this.write(writer, channels[channel], this.getData(numBytesPerWrite));
            }
        }
        writer.completeInput();
        writer.completeOutput();
        for (InputChannelStateHandle handle : (Collection)result.inputChannelStateHandles.get()) {
            Assert.assertEquals((long)((4 + numBytesPerWrite) * numWritesPerChannel), (long)handle.getStateSize());
        }
    }

    @Test
    public void testSmallFilesNotWritten() throws Exception {
        int threshold = 100;
        File checkpointsDir = this.temporaryFolder.newFolder("checkpointsDir");
        File sharedStateDir = this.temporaryFolder.newFolder("sharedStateDir");
        FsCheckpointStreamFactory checkpointStreamFactory = new FsCheckpointStreamFactory((FileSystem)LocalFileSystem.getSharedInstance(), Path.fromLocalFile((File)checkpointsDir), Path.fromLocalFile((File)sharedStateDir), threshold, threshold);
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter writer = this.createWriter(result, (CheckpointStreamFactory.CheckpointStateOutputStream)checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
        NetworkBuffer buffer = new NetworkBuffer((MemorySegment)HeapMemorySegment.FACTORY.allocateUnpooledSegment(threshold / 2, null), FreeingBufferRecycler.INSTANCE);
        writer.writeInput(new InputChannelInfo(1, 2), (Buffer)buffer);
        writer.completeOutput();
        writer.completeInput();
        Assert.assertTrue((boolean)result.isDone());
        Assert.assertEquals((long)0L, (long)checkpointsDir.list().length);
        Assert.assertEquals((long)0L, (long)sharedStateDir.list().length);
    }

    @Test
    public void testEmptyState() throws Exception {
        MemCheckpointStreamFactory.MemoryCheckpointOutputStream stream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(1000){

            public StreamStateHandle closeAndGetHandle() {
                Assert.fail((String)"closeAndGetHandle shouldn't be called for empty channel state");
                return null;
            }
        };
        ChannelStateCheckpointWriter writer = this.createWriter(new ChannelStateWriter.ChannelStateWriteResult(), (CheckpointStreamFactory.CheckpointStateOutputStream)stream);
        writer.completeOutput();
        writer.completeInput();
        Assert.assertTrue((boolean)stream.isClosed());
    }

    @Test
    public void testRecyclingBuffers() throws Exception {
        ChannelStateCheckpointWriter writer = this.createWriter(new ChannelStateWriter.ChannelStateWriteResult());
        NetworkBuffer buffer = new NetworkBuffer((MemorySegment)HeapMemorySegment.FACTORY.allocateUnpooledSegment(10, null), FreeingBufferRecycler.INSTANCE);
        writer.writeInput(new InputChannelInfo(1, 2), (Buffer)buffer);
        Assert.assertTrue((boolean)buffer.isRecycled());
    }

    @Test
    public void testFlush() throws Exception {
        class FlushRecorder
        extends DataOutputStream {
            private boolean flushed;

            FlushRecorder() {
                super(new ByteArrayOutputStream());
                this.flushed = false;
            }

            @Override
            public void flush() throws IOException {
                this.flushed = true;
                super.flush();
            }
        }
        FlushRecorder dataStream = new FlushRecorder();
        ChannelStateCheckpointWriter writer = new ChannelStateCheckpointWriter(1L, new ChannelStateWriter.ChannelStateWriteResult(), (ChannelStateSerializer)new ChannelStateSerializerImpl(), NO_OP_RUNNABLE, (CheckpointStreamFactory.CheckpointStateOutputStream)new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(42), (DataOutputStream)dataStream);
        writer.completeInput();
        writer.completeOutput();
        Assert.assertTrue((boolean)dataStream.flushed);
    }

    @Test
    public void testResultCompletion() throws Exception {
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter writer = this.createWriter(result);
        writer.completeInput();
        Assert.assertFalse((boolean)result.isDone());
        writer.completeOutput();
        Assert.assertTrue((boolean)result.isDone());
    }

    @Test
    public void testRecordingOffsets() throws Exception {
        HashMap<InputChannelInfo, Integer> offsetCounts = new HashMap<InputChannelInfo, Integer>();
        offsetCounts.put(new InputChannelInfo(1, 1), 1);
        offsetCounts.put(new InputChannelInfo(1, 2), 2);
        offsetCounts.put(new InputChannelInfo(1, 3), 5);
        int numBytes = 100;
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter writer = this.createWriter(result);
        for (Map.Entry e : offsetCounts.entrySet()) {
            for (int i = 0; i < (Integer)e.getValue(); ++i) {
                this.write(writer, (InputChannelInfo)e.getKey(), this.getData(numBytes));
            }
        }
        writer.completeInput();
        writer.completeOutput();
        for (InputChannelStateHandle handle : (Collection)result.inputChannelStateHandles.get()) {
            int headerSize = 4;
            int lengthSize = 4;
            Assert.assertEquals(Collections.singletonList(Long.valueOf(headerSize)), (Object)handle.getOffsets());
            Assert.assertEquals((long)(headerSize + lengthSize + numBytes * (Integer)offsetCounts.remove(handle.getInfo())), (long)handle.getDelegate().getStateSize());
        }
        Assert.assertTrue((boolean)offsetCounts.isEmpty());
    }

    private byte[] getData(int len) {
        byte[] bytes = new byte[len];
        this.random.nextBytes(bytes);
        return bytes;
    }

    private void write(ChannelStateCheckpointWriter writer, InputChannelInfo channelInfo, byte[] data) throws Exception {
        MemorySegment segment = MemorySegmentFactory.wrap((byte[])data);
        NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
        writer.writeInput(channelInfo, (Buffer)buffer);
    }

    private ChannelStateCheckpointWriter createWriter(ChannelStateWriter.ChannelStateWriteResult result) throws Exception {
        return this.createWriter(result, (CheckpointStreamFactory.CheckpointStateOutputStream)new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(1000));
    }

    private ChannelStateCheckpointWriter createWriter(ChannelStateWriter.ChannelStateWriteResult result, CheckpointStreamFactory.CheckpointStateOutputStream stream) throws Exception {
        return new ChannelStateCheckpointWriter(1L, result, stream, (ChannelStateSerializer)new ChannelStateSerializerImpl(), NO_OP_RUNNABLE);
    }
}

