package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.class */
class ChannelStateWriterImplTest {
    private static final long CHECKPOINT_ID = 42;
    private static final String TASK_NAME = "test";
    private static final int SUBTASK_INDEX = 0;
    private static final JobID JOB_ID = new JobID();
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage();

    ChannelStateWriterImplTest() {
    }

    @Test
    void testAddEventBuffer() throws Exception {
        NetworkBuffer buffer = getBuffer();
        NetworkBuffer buffer2 = getBuffer();
        buffer2.setDataType(Buffer.DataType.EVENT_BUFFER);
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            callAddInputData(channelStateWriter, buffer2, buffer);
            syncChannelStateWriteRequestExecutor.getClass();
            Assertions.assertThatThrownBy(syncChannelStateWriteRequestExecutor::processAllRequests).isInstanceOf(IllegalArgumentException.class);
        });
        Assertions.assertThat(buffer.isRecycled()).isTrue();
    }

    @Test
    void testResultCompletion() throws IOException {
        ChannelStateWriterImpl openWriter = openWriter();
        Throwable th = null;
        try {
            try {
                callStart(openWriter);
                ChannelStateWriter.ChannelStateWriteResult andRemoveWriteResult = openWriter.getAndRemoveWriteResult(CHECKPOINT_ID);
                Assertions.assertThat(andRemoveWriteResult.resultSubpartitionStateHandles).isNotDone();
                Assertions.assertThat(andRemoveWriteResult.inputChannelStateHandles).isNotDone();
                if (openWriter != null) {
                    if (0 != 0) {
                        try {
                            openWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        openWriter.close();
                    }
                }
                Assertions.assertThat(andRemoveWriteResult.inputChannelStateHandles).isDone();
                Assertions.assertThat(andRemoveWriteResult.resultSubpartitionStateHandles).isDone();
            } finally {
            }
        } catch (Throwable th3) {
            if (openWriter != null) {
                if (th != null) {
                    try {
                        openWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testAbort() throws Exception {
        NetworkBuffer buffer = getBuffer();
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            ChannelStateWriter.ChannelStateWriteResult andRemoveWriteResult = channelStateWriter.getAndRemoveWriteResult(CHECKPOINT_ID);
            callAddInputData(channelStateWriter, buffer);
            callAbort(channelStateWriter);
            syncChannelStateWriteRequestExecutor.processAllRequests();
            Assertions.assertThat(andRemoveWriteResult.isDone()).isTrue();
            Assertions.assertThat(buffer.isRecycled()).isTrue();
        });
    }

    @Test
    void testAbortClearsResults() throws Exception {
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            channelStateWriter.abort(CHECKPOINT_ID, new TestException(), true);
            Assertions.assertThatThrownBy(() -> {
                channelStateWriter.getAndRemoveWriteResult(CHECKPOINT_ID);
            }).isInstanceOf(IllegalArgumentException.class);
        });
    }

    @Test
    void testAbortDoesNotClearsResults() throws Exception {
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            callAbort(channelStateWriter);
            syncChannelStateWriteRequestExecutor.processAllRequests();
            channelStateWriter.getAndRemoveWriteResult(CHECKPOINT_ID);
        });
    }

    @Test
    void testAbortIgnoresMissing() throws Exception {
        executeCallbackAndProcessWithSyncWorker(this::callAbort);
    }

    @Test
    void testAbortOldAndStartNewCheckpoint() throws Exception {
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            channelStateWriter.start(42, CheckpointOptions.forCheckpointWithDefaultLocation());
            channelStateWriter.abort(42, new TestException(), false);
            channelStateWriter.start(43, CheckpointOptions.forCheckpointWithDefaultLocation());
            syncChannelStateWriteRequestExecutor.processAllRequests();
            ChannelStateWriter.ChannelStateWriteResult andRemoveWriteResult = channelStateWriter.getAndRemoveWriteResult(42);
            Assertions.assertThat(andRemoveWriteResult.isDone()).isTrue();
            Assertions.assertThatThrownBy(() -> {
            }).as("The result should have failed.", new Object[0]).hasCauseInstanceOf(TestException.class);
            Assertions.assertThat(channelStateWriter.getAndRemoveWriteResult(43).isDone()).isFalse();
        });
    }

    @Test
    void testBuffersRecycledOnError() {
        NetworkBuffer buffer = getBuffer();
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl(JOB_VERTEX_ID, TASK_NAME, 0, new ConcurrentHashMap(), failingWorker(), 5);
        Assertions.assertThatThrownBy(() -> {
            callAddInputData(channelStateWriterImpl, buffer);
        }).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(TestException.class);
        Assertions.assertThat(buffer.isRecycled()).isTrue();
    }

    @Test
    void testBuffersRecycledOnClose() throws Exception {
        NetworkBuffer buffer = getBuffer();
        executeCallbackAndProcessWithSyncWorker(channelStateWriter -> {
            callStart(channelStateWriter);
            callAddInputData(channelStateWriter, buffer);
            Assertions.assertThat(buffer.isRecycled()).isFalse();
        });
        Assertions.assertThat(buffer.isRecycled()).isTrue();
    }

    @Test
    void testNoAddDataAfterFinished() throws Exception {
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            callFinish(channelStateWriter);
            syncChannelStateWriteRequestExecutor.processAllRequests();
            callAddInputData(channelStateWriter, new NetworkBuffer[0]);
            syncChannelStateWriteRequestExecutor.getClass();
            Assertions.assertThatThrownBy(syncChannelStateWriteRequestExecutor::processAllRequests).isInstanceOf(IllegalArgumentException.class);
        });
    }

    @Test
    void testAddDataNotStarted() {
        Assertions.assertThatThrownBy(() -> {
            executeCallbackAndProcessWithSyncWorker(channelStateWriter -> {
                this.callAddInputData(channelStateWriter, new NetworkBuffer[0]);
            });
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testFinishNotStarted() {
        Assertions.assertThatThrownBy(() -> {
            executeCallbackAndProcessWithSyncWorker(this::callFinish);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testRethrowOnClose() {
        Assertions.assertThatThrownBy(() -> {
            executeCallbackAndProcessWithSyncWorker(channelStateWriter -> {
                try {
                    callFinish(channelStateWriter);
                } catch (IllegalArgumentException e) {
                }
            });
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testRethrowOnNextCall() {
        SyncChannelStateWriteRequestExecutor syncChannelStateWriteRequestExecutor = new SyncChannelStateWriteRequestExecutor(JOB_ID);
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl(JOB_VERTEX_ID, TASK_NAME, 0, new ConcurrentHashMap(), syncChannelStateWriteRequestExecutor, 5);
        syncChannelStateWriteRequestExecutor.registerSubtask(JOB_VERTEX_ID, 0);
        syncChannelStateWriteRequestExecutor.setThrown(new TestException());
        Assertions.assertThatThrownBy(() -> {
            callStart(channelStateWriterImpl);
        }).hasCauseInstanceOf(TestException.class);
    }

    @Test
    void testLimit() throws IOException {
        int i = 3;
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl(JOB_VERTEX_ID, TASK_NAME, 0, () -> {
            return CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID);
        }, 3, new ChannelStateWriteRequestExecutorFactory(JOB_ID), 5);
        Throwable th = null;
        for (int i2 = 0; i2 < 3; i2++) {
            try {
                try {
                    channelStateWriterImpl.start(i2, CheckpointOptions.forCheckpointWithDefaultLocation());
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (channelStateWriterImpl != null) {
                    if (th != null) {
                        try {
                            channelStateWriterImpl.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        channelStateWriterImpl.close();
                    }
                }
                throw th3;
            }
        }
        Assertions.assertThatThrownBy(() -> {
            channelStateWriterImpl.start(i, CheckpointOptions.forCheckpointWithDefaultLocation());
        }).isInstanceOf(IllegalStateException.class);
        if (channelStateWriterImpl != null) {
            if (0 == 0) {
                channelStateWriterImpl.close();
                return;
            }
            try {
                channelStateWriterImpl.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    void testNoStartAfterClose() throws IOException {
        ChannelStateWriterImpl openWriter = openWriter();
        openWriter.close();
        Assertions.assertThatThrownBy(() -> {
            openWriter.start(CHECKPOINT_ID, CheckpointOptions.forCheckpointWithDefaultLocation());
        }).hasCauseInstanceOf(IllegalStateException.class);
    }

    @Test
    void testNoAddDataAfterClose() throws IOException {
        ChannelStateWriterImpl openWriter = openWriter();
        callStart(openWriter);
        openWriter.close();
        Assertions.assertThatThrownBy(() -> {
            callAddInputData(openWriter, new NetworkBuffer[0]);
        }).hasCauseInstanceOf(IllegalStateException.class);
    }

    private NetworkBuffer getBuffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(123, (Object) null), FreeingBufferRecycler.INSTANCE);
    }

    private ChannelStateWriteRequestExecutor failingWorker() {
        return new ChannelStateWriteRequestExecutor() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImplTest.1
            public void submit(ChannelStateWriteRequest channelStateWriteRequest) {
                throw new TestException();
            }

            public void submitPriority(ChannelStateWriteRequest channelStateWriteRequest) {
                throw new TestException();
            }

            public void start() throws IllegalStateException {
            }

            public void registerSubtask(JobVertexID jobVertexID, int i) {
            }

            public void releaseSubtask(JobVertexID jobVertexID, int i) {
            }
        };
    }

    private void executeCallbackAndProcessWithSyncWorker(Consumer<ChannelStateWriter> consumer) throws Exception {
        executeCallbackWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            consumer.accept(channelStateWriter);
            syncChannelStateWriteRequestExecutor.processAllRequests();
        });
    }

    private void executeCallbackWithSyncWorker(BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception> biConsumerWithException) throws Exception {
        SyncChannelStateWriteRequestExecutor syncChannelStateWriteRequestExecutor = new SyncChannelStateWriteRequestExecutor(JOB_ID);
        try {
            ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl(JOB_VERTEX_ID, TASK_NAME, 0, new ConcurrentHashMap(), syncChannelStateWriteRequestExecutor, 5);
            Throwable th = null;
            try {
                syncChannelStateWriteRequestExecutor.registerSubtask(JOB_VERTEX_ID, 0);
                biConsumerWithException.accept(channelStateWriterImpl, syncChannelStateWriteRequestExecutor);
                if (channelStateWriterImpl != null) {
                    if (0 != 0) {
                        try {
                            channelStateWriterImpl.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        channelStateWriterImpl.close();
                    }
                }
            } finally {
            }
        } finally {
            syncChannelStateWriteRequestExecutor.releaseSubtask(JOB_VERTEX_ID, 0);
        }
    }

    private ChannelStateWriterImpl openWriter() throws IOException {
        return new ChannelStateWriterImpl(JOB_VERTEX_ID, TASK_NAME, 0, () -> {
            return CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID);
        }, new ChannelStateWriteRequestExecutorFactory(JOB_ID), 5);
    }

    private void callStart(ChannelStateWriter channelStateWriter) {
        channelStateWriter.start(CHECKPOINT_ID, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void callAddInputData(ChannelStateWriter channelStateWriter, NetworkBuffer... networkBufferArr) {
        channelStateWriter.addInputData(CHECKPOINT_ID, new InputChannelInfo(1, 1), 1, CloseableIterator.ofElements((v0) -> {
            v0.recycleBuffer();
        }, networkBufferArr));
    }

    private void callAbort(ChannelStateWriter channelStateWriter) {
        channelStateWriter.abort(CHECKPOINT_ID, new TestException(), false);
    }

    private void callFinish(ChannelStateWriter channelStateWriter) {
        channelStateWriter.finishInput(CHECKPOINT_ID);
        channelStateWriter.finishOutput(CHECKPOINT_ID);
    }
}
