package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.Preconditions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/AsyncSnapshotCallableTest.class */
public class AsyncSnapshotCallableTest {
    private static final String METHOD_CALL = "callInternal";
    private static final String METHOD_LOG = "logAsyncSnapshotComplete";
    private static final String METHOD_CLEANUP = "cleanupProvidedResources";
    private static final String METHOD_CANCEL = "cancel";
    private static final String SUCCESS = "Success!";
    private CloseableRegistry ownerRegistry;
    private TestBlockingCloseable testProvidedResource;
    private TestBlockingCloseable testBlocker;
    private TestAsyncSnapshotCallable testAsyncSnapshotCallable;
    private FutureTask<String> task;

    /* loaded from: input_file:org/apache/flink/runtime/state/AsyncSnapshotCallableTest$TestAsyncSnapshotCallable.class */
    private static class TestAsyncSnapshotCallable extends AsyncSnapshotCallable<String> {

        @Nonnull
        private final TestBlockingCloseable providedResource;

        @Nonnull
        private final TestBlockingCloseable blockingResource;

        @Nonnull
        private final List<String> invocationOrder = new ArrayList();

        TestAsyncSnapshotCallable(@Nonnull TestBlockingCloseable testBlockingCloseable, @Nonnull TestBlockingCloseable testBlockingCloseable2) {
            this.providedResource = testBlockingCloseable;
            this.blockingResource = testBlockingCloseable2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public String m565callInternal() throws Exception {
            addInvocation(AsyncSnapshotCallableTest.METHOD_CALL);
            this.snapshotCloseableRegistry.registerCloseable(this.blockingResource);
            try {
                this.blockingResource.simulateBlockingOperation();
            } finally {
                if (this.snapshotCloseableRegistry.unregisterCloseable(this.blockingResource)) {
                    this.blockingResource.close();
                }
            }
        }

        protected void cleanupProvidedResources() {
            addInvocation(AsyncSnapshotCallableTest.METHOD_CLEANUP);
            this.providedResource.close();
        }

        protected void logAsyncSnapshotComplete(long j) {
            this.invocationOrder.add(AsyncSnapshotCallableTest.METHOD_LOG);
        }

        protected void cancel() {
            addInvocation(AsyncSnapshotCallableTest.METHOD_CANCEL);
            super.cancel();
        }

        @Nonnull
        public List<String> getInvocationOrder() {
            ArrayList arrayList;
            synchronized (this.invocationOrder) {
                arrayList = new ArrayList(this.invocationOrder);
            }
            return arrayList;
        }

        private void addInvocation(@Nonnull String str) {
            synchronized (this.invocationOrder) {
                this.invocationOrder.add(str);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/AsyncSnapshotCallableTest$TestBlockingCloseable.class */
    public static class TestBlockingCloseable implements Closeable {
        private final OneShotLatch blockerLatch;
        private boolean closed;
        private boolean unblocked;
        private boolean exceptionally;

        private TestBlockingCloseable() {
            this.blockerLatch = new OneShotLatch();
            this.closed = false;
            this.unblocked = false;
            this.exceptionally = false;
        }

        public void simulateBlockingOperation() throws IOException {
            while (!this.unblocked) {
                try {
                    this.blockerLatch.await();
                } catch (InterruptedException e) {
                    this.blockerLatch.reset();
                }
            }
            if (this.exceptionally) {
                throw new IOException("Closed in block");
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            Preconditions.checkState(!this.closed);
            this.closed = true;
            unblockExceptionally();
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void unblockExceptionally() {
            introduceException();
            unblock();
        }

        public void unblockSuccessfully() {
            unblock();
        }

        private void unblock() {
            this.unblocked = true;
            this.blockerLatch.trigger();
        }

        public void introduceException() {
            this.exceptionally = true;
        }

        public int getWaitersCount() {
            return this.blockerLatch.getWaitersCount();
        }
    }

    @Before
    public void setup() throws IOException {
        this.ownerRegistry = new CloseableRegistry();
        this.testProvidedResource = new TestBlockingCloseable();
        this.testBlocker = new TestBlockingCloseable();
        this.testAsyncSnapshotCallable = new TestAsyncSnapshotCallable(this.testProvidedResource, this.testBlocker);
        this.task = this.testAsyncSnapshotCallable.toAsyncSnapshotFutureTask(this.ownerRegistry);
        Assert.assertEquals(1L, this.ownerRegistry.getNumberOfRegisteredCloseables());
    }

    @After
    public void finalChecks() {
        Assert.assertTrue(this.testProvidedResource.isClosed());
        Assert.assertEquals(0L, this.ownerRegistry.getNumberOfRegisteredCloseables());
    }

    @Test
    public void testNormalRun() throws Exception {
        Thread startTask = startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.testBlocker.unblockSuccessfully();
        startTask.join();
        Assert.assertEquals(SUCCESS, this.task.get());
        Assert.assertEquals(Arrays.asList(METHOD_CALL, METHOD_LOG, METHOD_CLEANUP), this.testAsyncSnapshotCallable.getInvocationOrder());
        Assert.assertTrue(this.testBlocker.isClosed());
    }

    @Test
    public void testExceptionRun() throws Exception {
        this.testBlocker.introduceException();
        Thread startTask = startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.testBlocker.unblockSuccessfully();
        try {
            this.task.get();
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertEquals(IOException.class, e.getCause().getClass());
        }
        startTask.join();
        Assert.assertEquals(Arrays.asList(METHOD_CALL, METHOD_CLEANUP), this.testAsyncSnapshotCallable.getInvocationOrder());
        Assert.assertTrue(this.testBlocker.isClosed());
    }

    @Test
    public void testCancelRun() throws Exception {
        Thread startTask = startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.task.cancel(true);
        this.testBlocker.unblockExceptionally();
        try {
            this.task.get();
            Assert.fail();
        } catch (CancellationException e) {
        }
        startTask.join();
        Assert.assertEquals(Arrays.asList(METHOD_CALL, METHOD_CANCEL, METHOD_CLEANUP), this.testAsyncSnapshotCallable.getInvocationOrder());
        Assert.assertTrue(this.testProvidedResource.isClosed());
        Assert.assertTrue(this.testBlocker.isClosed());
    }

    @Test
    public void testCloseRun() throws Exception {
        Thread startTask = startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.ownerRegistry.close();
        try {
            this.task.get();
            Assert.fail();
        } catch (CancellationException e) {
        }
        startTask.join();
        Assert.assertEquals(Arrays.asList(METHOD_CALL, METHOD_CANCEL, METHOD_CLEANUP), this.testAsyncSnapshotCallable.getInvocationOrder());
        Assert.assertTrue(this.testBlocker.isClosed());
    }

    @Test
    public void testCancelBeforeRun() throws Exception {
        this.task.cancel(true);
        Thread startTask = startTask(this.task);
        try {
            this.task.get();
            Assert.fail();
        } catch (CancellationException e) {
        }
        startTask.join();
        Assert.assertEquals(Arrays.asList(METHOD_CANCEL, METHOD_CLEANUP), this.testAsyncSnapshotCallable.getInvocationOrder());
        Assert.assertTrue(this.testProvidedResource.isClosed());
    }

    private Thread startTask(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.start();
        return thread;
    }
}
