package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.class */
class ChannelStateCheckpointWriterTest {
    private static final RunnableWithException NO_OP_RUNNABLE;
    private final Random random = new Random();
    private static final JobVertexID JOB_VERTEX_ID;
    private static final int SUBTASK_INDEX = 0;
    private static final SubtaskID SUBTASK_ID;

    @TempDir
    private Path temporaryFolder;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriterTest$1FlushRecorder, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest$1FlushRecorder.class */
    class C1FlushRecorder extends DataOutputStream {
        private boolean flushed;

        private C1FlushRecorder() {
            super(new ByteArrayOutputStream());
            this.flushed = false;
        }

        @Override // java.io.DataOutputStream, java.io.FilterOutputStream, java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.flushed = true;
            super.flush();
        }
    }

    ChannelStateCheckpointWriterTest() {
    }

    @Test
    void testFileHandleSize() throws Exception {
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult, (CheckpointStateOutputStream) new FsCheckpointStreamFactory(LocalFileSystem.getSharedInstance(), org.apache.flink.core.fs.Path.fromLocalFile(TempDirUtils.newFolder(this.temporaryFolder, new String[]{"checkpointsDir"})), org.apache.flink.core.fs.Path.fromLocalFile(TempDirUtils.newFolder(this.temporaryFolder, new String[]{"sharedStateDir"})), 5 - 1, 5 - 1).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
        InputChannelInfo[] inputChannelInfoArr = (InputChannelInfo[]) IntStream.range(0, 3).mapToObj(i -> {
            return new InputChannelInfo(0, i);
        }).toArray(i2 -> {
            return new InputChannelInfo[i2];
        });
        for (int i3 = 0; i3 < 4; i3++) {
            for (int i4 = 0; i4 < 3; i4++) {
                write(createWriter, inputChannelInfoArr[i4], getData(5));
            }
        }
        createWriter.completeInput(JOB_VERTEX_ID, 0);
        createWriter.completeOutput(JOB_VERTEX_ID, 0);
        Iterator it = ((Collection) channelStateWriteResult.inputChannelStateHandles.get()).iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((InputChannelStateHandle) it.next()).getStateSize()).isEqualTo((4 + 5) * 4);
        }
    }

    @Test
    void testSmallFilesNotWritten() throws Exception {
        File newFolder = TempDirUtils.newFolder(this.temporaryFolder, new String[]{"checkpointsDir"});
        File newFolder2 = TempDirUtils.newFolder(this.temporaryFolder, new String[]{"sharedStateDir"});
        FsCheckpointStreamFactory fsCheckpointStreamFactory = new FsCheckpointStreamFactory(LocalFileSystem.getSharedInstance(), org.apache.flink.core.fs.Path.fromLocalFile(newFolder), org.apache.flink.core.fs.Path.fromLocalFile(newFolder2), 100, 100);
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult, (CheckpointStateOutputStream) fsCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
        createWriter.writeInput(JOB_VERTEX_ID, 0, new InputChannelInfo(1, 2), new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(100 / 2), FreeingBufferRecycler.INSTANCE));
        createWriter.completeOutput(JOB_VERTEX_ID, 0);
        createWriter.completeInput(JOB_VERTEX_ID, 0);
        Assertions.assertThat(channelStateWriteResult.isDone()).isTrue();
        Assertions.assertThat(newFolder).isEmptyDirectory();
        Assertions.assertThat(newFolder2).isEmptyDirectory();
    }

    @Test
    void testEmptyState() throws Exception {
        MemCheckpointStreamFactory.MemoryCheckpointOutputStream memoryCheckpointOutputStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS) { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriterTest.1
            public StreamStateHandle closeAndGetHandle() {
                Assertions.fail("closeAndGetHandle shouldn't be called for empty channel state");
                return null;
            }
        };
        ChannelStateCheckpointWriter createWriter = createWriter(new ChannelStateWriter.ChannelStateWriteResult(), (CheckpointStateOutputStream) memoryCheckpointOutputStream);
        createWriter.completeOutput(JOB_VERTEX_ID, 0);
        createWriter.completeInput(JOB_VERTEX_ID, 0);
        Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isTrue();
    }

    @Test
    void testRecyclingBuffers() {
        ChannelStateCheckpointWriter createWriter = createWriter(new ChannelStateWriter.ChannelStateWriteResult());
        NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(10), FreeingBufferRecycler.INSTANCE);
        createWriter.writeInput(JOB_VERTEX_ID, 0, new InputChannelInfo(1, 2), networkBuffer);
        Assertions.assertThat(networkBuffer.isRecycled()).isTrue();
    }

    @Test
    void testFlush() throws Exception {
        C1FlushRecorder c1FlushRecorder = new C1FlushRecorder();
        ChannelStateCheckpointWriter channelStateCheckpointWriter = new ChannelStateCheckpointWriter(Collections.singleton(SUBTASK_ID), 1L, new ChannelStateSerializerImpl(), NO_OP_RUNNABLE, new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(42), c1FlushRecorder);
        channelStateCheckpointWriter.registerSubtaskResult(SUBTASK_ID, new ChannelStateWriter.ChannelStateWriteResult());
        channelStateCheckpointWriter.completeInput(JOB_VERTEX_ID, 0);
        channelStateCheckpointWriter.completeOutput(JOB_VERTEX_ID, 0);
        Assertions.assertThat(c1FlushRecorder.flushed).isTrue();
    }

    @Test
    void testResultCompletion() throws Exception {
        for (int i = 1; i < 10; i++) {
            testMultiTaskCompletionAndAssertResult(i);
        }
    }

    private void testMultiTaskCompletionAndAssertResult(int i) throws Exception {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(SubtaskID.of(new JobVertexID(), i2), new ChannelStateWriter.ChannelStateWriteResult());
        }
        MemCheckpointStreamFactory.MemoryCheckpointOutputStream memoryCheckpointOutputStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS);
        ChannelStateCheckpointWriter createWriter = createWriter((CheckpointStateOutputStream) memoryCheckpointOutputStream, hashMap.keySet());
        for (Map.Entry entry : hashMap.entrySet()) {
            createWriter.registerSubtaskResult((SubtaskID) entry.getKey(), (ChannelStateWriter.ChannelStateWriteResult) entry.getValue());
        }
        for (SubtaskID subtaskID : hashMap.keySet()) {
            ChannelStateWriteResultUtil.assertAllSubtaskNotDone(hashMap.values());
            Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isFalse();
            createWriter.completeInput(subtaskID.getJobVertexID(), subtaskID.getSubtaskIndex());
            ChannelStateWriteResultUtil.assertAllSubtaskNotDone(hashMap.values());
            Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isFalse();
            createWriter.completeOutput(subtaskID.getJobVertexID(), subtaskID.getSubtaskIndex());
        }
        Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isTrue();
        ChannelStateWriteResultUtil.assertAllSubtaskDoneNormally(hashMap.values());
    }

    @Test
    void testTaskUnregister() throws Exception {
        testTaskUnregisterAndAssertResult(2);
        testTaskUnregisterAndAssertResult(3);
        testTaskUnregisterAndAssertResult(5);
        testTaskUnregisterAndAssertResult(10);
    }

    private void testTaskUnregisterAndAssertResult(int i) throws Exception {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(SubtaskID.of(new JobVertexID(), i2), new ChannelStateWriter.ChannelStateWriteResult());
        }
        MemCheckpointStreamFactory.MemoryCheckpointOutputStream memoryCheckpointOutputStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS);
        ChannelStateCheckpointWriter createWriter = createWriter((CheckpointStateOutputStream) memoryCheckpointOutputStream, hashMap.keySet());
        SubtaskID subtaskID = null;
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            if (subtaskID == null) {
                subtaskID = (SubtaskID) entry.getKey();
                it.remove();
            } else {
                createWriter.registerSubtaskResult((SubtaskID) entry.getKey(), (ChannelStateWriter.ChannelStateWriteResult) entry.getValue());
            }
        }
        for (SubtaskID subtaskID2 : hashMap.keySet()) {
            createWriter.completeInput(subtaskID2.getJobVertexID(), subtaskID2.getSubtaskIndex());
            createWriter.completeOutput(subtaskID2.getJobVertexID(), subtaskID2.getSubtaskIndex());
        }
        ChannelStateWriteResultUtil.assertAllSubtaskNotDone(hashMap.values());
        Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isFalse();
        if (!$assertionsDisabled && subtaskID == null) {
            throw new AssertionError();
        }
        createWriter.releaseSubtask(subtaskID);
        Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isTrue();
        ChannelStateWriteResultUtil.assertAllSubtaskDoneNormally(hashMap.values());
    }

    @Test
    void testTaskFailThenCompleteOtherTask() {
        testTaskFailAfterAllTaskRegisteredAndAssertResult(2);
        testTaskFailAfterAllTaskRegisteredAndAssertResult(3);
        testTaskFailAfterAllTaskRegisteredAndAssertResult(5);
        testTaskFailAfterAllTaskRegisteredAndAssertResult(10);
    }

    private void testTaskFailAfterAllTaskRegisteredAndAssertResult(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(SubtaskID.of(new JobVertexID(), i2), new ChannelStateWriter.ChannelStateWriteResult());
        }
        MemCheckpointStreamFactory.MemoryCheckpointOutputStream memoryCheckpointOutputStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS);
        ChannelStateCheckpointWriter createWriter = createWriter((CheckpointStateOutputStream) memoryCheckpointOutputStream, hashMap.keySet());
        SubtaskID subtaskID = null;
        for (Map.Entry entry : hashMap.entrySet()) {
            if (subtaskID == null) {
                subtaskID = (SubtaskID) entry.getKey();
            }
            createWriter.registerSubtaskResult((SubtaskID) entry.getKey(), (ChannelStateWriter.ChannelStateWriteResult) entry.getValue());
        }
        Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isFalse();
        if (!$assertionsDisabled && subtaskID == null) {
            throw new AssertionError();
        }
        createWriter.fail(subtaskID.getJobVertexID(), subtaskID.getSubtaskIndex(), new TestException());
        Assertions.assertThat(memoryCheckpointOutputStream.isClosed()).isTrue();
        for (Map.Entry entry2 : hashMap.entrySet()) {
            if (subtaskID.equals(entry2.getKey())) {
                ChannelStateWriteResultUtil.assertHasSpecialCause((ChannelStateWriter.ChannelStateWriteResult) entry2.getValue(), TestException.class);
            } else {
                ChannelStateWriteResultUtil.assertCheckpointFailureReason((ChannelStateWriter.ChannelStateWriteResult) entry2.getValue(), CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION);
            }
        }
    }

    @Test
    void testCloseGetHandleThrowException() throws Exception {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            hashMap.put(SubtaskID.of(new JobVertexID(), i), new ChannelStateWriter.ChannelStateWriteResult());
        }
        CloseExceptionOutputStream closeExceptionOutputStream = new CloseExceptionOutputStream();
        ChannelStateCheckpointWriter createWriter = createWriter((CheckpointStateOutputStream) closeExceptionOutputStream, hashMap.keySet());
        for (Map.Entry entry : hashMap.entrySet()) {
            SubtaskID subtaskID = (SubtaskID) entry.getKey();
            createWriter.registerSubtaskResult(subtaskID, (ChannelStateWriter.ChannelStateWriteResult) entry.getValue());
            createWriter.writeInput(subtaskID.getJobVertexID(), subtaskID.getSubtaskIndex(), new InputChannelInfo(1, 2), new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(10), FreeingBufferRecycler.INSTANCE));
        }
        for (SubtaskID subtaskID2 : hashMap.keySet()) {
            ChannelStateWriteResultUtil.assertAllSubtaskNotDone(hashMap.values());
            Assertions.assertThat(closeExceptionOutputStream.isClosed()).isFalse();
            createWriter.completeInput(subtaskID2.getJobVertexID(), subtaskID2.getSubtaskIndex());
            ChannelStateWriteResultUtil.assertAllSubtaskNotDone(hashMap.values());
            Assertions.assertThat(closeExceptionOutputStream.isClosed()).isFalse();
            createWriter.completeOutput(subtaskID2.getJobVertexID(), subtaskID2.getSubtaskIndex());
        }
        Assertions.assertThat(closeExceptionOutputStream.isClosed()).isTrue();
        for (Map.Entry entry2 : hashMap.entrySet()) {
            Assertions.assertThatThrownBy(() -> {
                ((ChannelStateWriter.ChannelStateWriteResult) entry2.getValue()).getInputChannelStateHandles().get();
            }).cause().isInstanceOf(IOException.class).hasMessage("Test closeAndGetHandle exception.");
            Assertions.assertThatThrownBy(() -> {
                ((ChannelStateWriter.ChannelStateWriteResult) entry2.getValue()).getResultSubpartitionStateHandles().get();
            }).cause().isInstanceOf(IOException.class).hasMessage("Test closeAndGetHandle exception.");
        }
    }

    @Test
    void testRegisterSubtaskAfterWriterDone() {
        HashMap hashMap = new HashMap();
        SubtaskID of = SubtaskID.of(JOB_VERTEX_ID, 0);
        SubtaskID of2 = SubtaskID.of(JOB_VERTEX_ID, 1);
        hashMap.put(of, new ChannelStateWriter.ChannelStateWriteResult());
        hashMap.put(of2, new ChannelStateWriter.ChannelStateWriteResult());
        ChannelStateCheckpointWriter createWriter = createWriter((CheckpointStateOutputStream) new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS), hashMap.keySet());
        createWriter.fail(new JobVertexID(), 0, new TestException());
        Assertions.assertThatThrownBy(() -> {
            createWriter.registerSubtaskResult(of, new ChannelStateWriter.ChannelStateWriteResult());
        }).isInstanceOf(IllegalStateException.class).hasMessage("The write is done.");
        Assertions.assertThatThrownBy(() -> {
            createWriter.registerSubtaskResult(of2, new ChannelStateWriter.ChannelStateWriteResult());
        }).isInstanceOf(IllegalStateException.class).hasMessage("The write is done.");
    }

    @Test
    void testRecordingOffsets() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new InputChannelInfo(1, 1), 1);
        hashMap.put(new InputChannelInfo(1, 2), 2);
        hashMap.put(new InputChannelInfo(1, 3), 5);
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateCheckpointWriter createWriter = createWriter(channelStateWriteResult);
        for (Map.Entry entry : hashMap.entrySet()) {
            for (int i = 0; i < ((Integer) entry.getValue()).intValue(); i++) {
                write(createWriter, (InputChannelInfo) entry.getKey(), getData(100));
            }
        }
        createWriter.completeInput(JOB_VERTEX_ID, 0);
        createWriter.completeOutput(JOB_VERTEX_ID, 0);
        for (InputChannelStateHandle inputChannelStateHandle : (Collection) channelStateWriteResult.inputChannelStateHandles.get()) {
            Assertions.assertThat(inputChannelStateHandle.getOffsets()).isEqualTo(Collections.singletonList(Long.valueOf(4)));
            Assertions.assertThat(inputChannelStateHandle.getDelegate().getStateSize()).isEqualTo(4 + 4 + (100 * ((Integer) hashMap.remove(inputChannelStateHandle.getInfo())).intValue()));
        }
        Assertions.assertThat(hashMap).isEmpty();
    }

    private byte[] getData(int i) {
        byte[] bArr = new byte[i];
        this.random.nextBytes(bArr);
        return bArr;
    }

    private void write(ChannelStateCheckpointWriter channelStateCheckpointWriter, InputChannelInfo inputChannelInfo, byte[] bArr) {
        MemorySegment wrap = MemorySegmentFactory.wrap(bArr);
        channelStateCheckpointWriter.writeInput(JOB_VERTEX_ID, 0, inputChannelInfo, new NetworkBuffer(wrap, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, wrap.size()));
    }

    private ChannelStateCheckpointWriter createWriter(ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult) {
        return createWriter(channelStateWriteResult, (CheckpointStateOutputStream) new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS));
    }

    private ChannelStateCheckpointWriter createWriter(ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStateOutputStream checkpointStateOutputStream) {
        ChannelStateCheckpointWriter createWriter = createWriter(checkpointStateOutputStream, Collections.singleton(SUBTASK_ID));
        createWriter.registerSubtaskResult(SUBTASK_ID, channelStateWriteResult);
        return createWriter;
    }

    private ChannelStateCheckpointWriter createWriter(CheckpointStateOutputStream checkpointStateOutputStream, Set<SubtaskID> set) {
        return new ChannelStateCheckpointWriter(set, 1L, checkpointStateOutputStream, new ChannelStateSerializerImpl(), NO_OP_RUNNABLE);
    }

    static {
        $assertionsDisabled = !ChannelStateCheckpointWriterTest.class.desiredAssertionStatus();
        NO_OP_RUNNABLE = () -> {
        };
        JOB_VERTEX_ID = new JobVertexID();
        SUBTASK_ID = SubtaskID.of(JOB_VERTEX_ID, 0);
    }
}
