package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.state.ChannelPersistenceITCase;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.class */
public class ChannelStateWriteRequestExecutorImplTest {
    private static final String TASK_NAME = "test task";

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest$TestRequestDispatcher.class */
    private static class TestRequestDispatcher implements ChannelStateWriteRequestDispatcher {
        private boolean isStopped;

        private TestRequestDispatcher() {
        }

        public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) {
        }

        public void fail(Throwable th) {
            this.isStopped = true;
        }

        public boolean isStopped() {
            return this.isStopped;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest$TestWriteRequest.class */
    public static class TestWriteRequest implements ChannelStateWriteRequest {
        private boolean cancelled;

        private TestWriteRequest() {
            this.cancelled = false;
        }

        public long getCheckpointId() {
            return 0L;
        }

        public void cancel(Throwable th) {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest$WorkerClosingDeque.class */
    public static class WorkerClosingDeque extends LinkedBlockingDeque<ChannelStateWriteRequest> {
        private ChannelStateWriteRequestExecutor worker;

        private WorkerClosingDeque() {
        }

        @Override // java.util.concurrent.LinkedBlockingDeque, java.util.concurrent.BlockingDeque, java.util.concurrent.BlockingQueue
        public void put(@Nonnull ChannelStateWriteRequest channelStateWriteRequest) throws InterruptedException {
            super.putFirst((WorkerClosingDeque) channelStateWriteRequest);
            try {
                this.worker.close();
            } catch (IOException e) {
                ExceptionUtils.rethrow(e);
            }
        }

        @Override // java.util.concurrent.LinkedBlockingDeque, java.util.concurrent.BlockingDeque
        public void putFirst(@Nonnull ChannelStateWriteRequest channelStateWriteRequest) throws InterruptedException {
            super.putFirst((WorkerClosingDeque) channelStateWriteRequest);
            try {
                this.worker.close();
            } catch (IOException e) {
                ExceptionUtils.rethrow(e);
            }
        }

        public void setWorker(ChannelStateWriteRequestExecutor channelStateWriteRequestExecutor) {
            this.worker = channelStateWriteRequestExecutor;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testCloseAfterSubmit() throws Exception {
        testCloseAfterSubmit((v0, v1) -> {
            v0.submit(v1);
        });
    }

    @Test(expected = IllegalStateException.class)
    public void testCloseAfterSubmitPriority() throws Exception {
        testCloseAfterSubmit((v0, v1) -> {
            v0.submitPriority(v1);
        });
    }

    @Test
    public void testSubmitFailure() throws Exception {
        testSubmitFailure((v0, v1) -> {
            v0.submit(v1);
        });
    }

    @Test
    public void testSubmitPriorityFailure() throws Exception {
        testSubmitFailure((v0, v1) -> {
            v0.submitPriority(v1);
        });
    }

    private void testCloseAfterSubmit(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> biConsumerWithException) throws Exception {
        WorkerClosingDeque workerClosingDeque = new WorkerClosingDeque();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, ChannelStateWriteRequestDispatcher.NO_OP, workerClosingDeque);
        workerClosingDeque.setWorker(channelStateWriteRequestExecutorImpl);
        TestWriteRequest testWriteRequest = new TestWriteRequest();
        biConsumerWithException.accept(channelStateWriteRequestExecutorImpl, testWriteRequest);
        Assert.assertTrue(workerClosingDeque.isEmpty());
        Assert.assertFalse(testWriteRequest.isCancelled());
    }

    private void testSubmitFailure(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> biConsumerWithException) throws Exception {
        TestWriteRequest testWriteRequest = new TestWriteRequest();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        try {
            biConsumerWithException.accept(new ChannelStateWriteRequestExecutorImpl(TASK_NAME, ChannelStateWriteRequestDispatcher.NO_OP, linkedBlockingDeque), testWriteRequest);
            Assert.assertTrue(testWriteRequest.cancelled);
            Assert.assertTrue(linkedBlockingDeque.isEmpty());
            throw new RuntimeException("expected exception not thrown");
        } catch (IllegalStateException e) {
            Assert.assertTrue(testWriteRequest.cancelled);
            Assert.assertTrue(linkedBlockingDeque.isEmpty());
        } catch (Throwable th) {
            Assert.assertTrue(testWriteRequest.cancelled);
            Assert.assertTrue(linkedBlockingDeque.isEmpty());
            throw th;
        }
    }

    @Test
    public void testCleanup() throws IOException {
        TestWriteRequest testWriteRequest = new TestWriteRequest();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        linkedBlockingDeque.add(testWriteRequest);
        TestRequestDispatcher testRequestDispatcher = new TestRequestDispatcher();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, testRequestDispatcher, linkedBlockingDeque);
        channelStateWriteRequestExecutorImpl.close();
        channelStateWriteRequestExecutorImpl.run();
        Assert.assertTrue(testRequestDispatcher.isStopped());
        Assert.assertTrue(linkedBlockingDeque.isEmpty());
        Assert.assertTrue(testWriteRequest.isCancelled());
    }

    @Test
    public void testIgnoresInterruptsWhileRunning() throws Exception {
        TestRequestDispatcher testRequestDispatcher = new TestRequestDispatcher();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, testRequestDispatcher, linkedBlockingDeque);
        Throwable th = null;
        try {
            try {
                channelStateWriteRequestExecutorImpl.start();
                channelStateWriteRequestExecutorImpl.getThread().interrupt();
                channelStateWriteRequestExecutorImpl.submit(new TestWriteRequest());
                channelStateWriteRequestExecutorImpl.getThread().interrupt();
                while (!linkedBlockingDeque.isEmpty()) {
                    Thread.sleep(100L);
                }
                if (channelStateWriteRequestExecutorImpl != null) {
                    if (0 == 0) {
                        channelStateWriteRequestExecutorImpl.close();
                        return;
                    }
                    try {
                        channelStateWriteRequestExecutorImpl.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (channelStateWriteRequestExecutorImpl != null) {
                if (th != null) {
                    try {
                        channelStateWriteRequestExecutorImpl.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    channelStateWriteRequestExecutorImpl.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCanBeClosed() throws Exception {
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, new ChannelStateWriteRequestDispatcherImpl("dummy task", 0, ChannelPersistenceITCase.getStreamFactoryFactory(), new ChannelStateSerializerImpl()));
        Throwable th = null;
        try {
            try {
                channelStateWriteRequestExecutorImpl.start();
                channelStateWriteRequestExecutorImpl.submit(new CheckpointStartRequest(1L, new ChannelStateWriter.ChannelStateWriteResult(), CheckpointStorageLocationReference.getDefault()));
                channelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequest.write(1L, new ResultSubpartitionInfo(0, 0), new CompletableFuture()));
                channelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequest.write(1L, new ResultSubpartitionInfo(0, 0), new CompletableFuture()));
                if (channelStateWriteRequestExecutorImpl != null) {
                    if (0 == 0) {
                        channelStateWriteRequestExecutorImpl.close();
                        return;
                    }
                    try {
                        channelStateWriteRequestExecutorImpl.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (channelStateWriteRequestExecutorImpl != null) {
                if (th != null) {
                    try {
                        channelStateWriteRequestExecutorImpl.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    channelStateWriteRequestExecutorImpl.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRecordsException() throws IOException {
        final TestException testException = new TestException();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, new TestRequestDispatcher() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.TestRequestDispatcher
            public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) {
                throw testException;
            }
        }, new LinkedBlockingDeque(Arrays.asList(new TestWriteRequest())));
        channelStateWriteRequestExecutorImpl.run();
        try {
            channelStateWriteRequestExecutorImpl.close();
            Assert.fail("exception not thrown");
        } catch (IOException e) {
            if (!ExceptionUtils.findThrowable(e, TestException.class).filter(testException2 -> {
                return testException2 == testException;
            }).isPresent()) {
                throw e;
            }
        }
    }
}
