/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AsyncSnapshotCallableTest {
    private static final String METHOD_CALL = "callInternal";
    private static final String METHOD_LOG = "logAsyncSnapshotComplete";
    private static final String METHOD_CLEANUP = "cleanupProvidedResources";
    private static final String METHOD_CANCEL = "cancel";
    private static final String SUCCESS = "Success!";
    private CloseableRegistry ownerRegistry;
    private TestBlockingCloseable testProvidedResource;
    private TestBlockingCloseable testBlocker;
    private TestAsyncSnapshotCallable testAsyncSnapshotCallable;
    private FutureTask<String> task;

    AsyncSnapshotCallableTest() {
    }

    @BeforeEach
    void setup() throws IOException {
        this.ownerRegistry = new CloseableRegistry();
        this.testProvidedResource = new TestBlockingCloseable();
        this.testBlocker = new TestBlockingCloseable();
        this.testAsyncSnapshotCallable = new TestAsyncSnapshotCallable(this.testProvidedResource, this.testBlocker);
        this.task = this.testAsyncSnapshotCallable.toAsyncSnapshotFutureTask(this.ownerRegistry);
        Assertions.assertThat((int)this.ownerRegistry.getNumberOfRegisteredCloseables()).isOne();
    }

    @AfterEach
    void finalChecks() {
        Assertions.assertThat((boolean)this.testProvidedResource.isClosed()).isTrue();
        Assertions.assertThat((int)this.ownerRegistry.getNumberOfRegisteredCloseables()).isZero();
    }

    @Test
    void testNormalRun() throws Exception {
        Thread runner = this.startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.testBlocker.unblockSuccessfully();
        runner.join();
        Assertions.assertThat((String)this.task.get()).isEqualTo(SUCCESS);
        Assertions.assertThat(this.testAsyncSnapshotCallable.getInvocationOrder()).containsExactly((Object[])new String[]{METHOD_CALL, METHOD_LOG, METHOD_CLEANUP});
        Assertions.assertThat((boolean)this.testBlocker.isClosed()).isTrue();
    }

    @Test
    void testExceptionRun() throws Exception {
        this.testBlocker.introduceException();
        Thread runner = this.startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.testBlocker.unblockSuccessfully();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(this.task::get).isInstanceOf(ExecutionException.class)).hasCauseInstanceOf(IOException.class);
        runner.join();
        Assertions.assertThat(this.testAsyncSnapshotCallable.getInvocationOrder()).containsExactly((Object[])new String[]{METHOD_CALL, METHOD_CLEANUP});
        Assertions.assertThat((boolean)this.testBlocker.isClosed()).isTrue();
    }

    @Test
    void testCancelRun() throws Exception {
        Thread runner = this.startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.task.cancel(true);
        this.testBlocker.unblockExceptionally();
        Assertions.assertThatThrownBy(this.task::get).isInstanceOf(CancellationException.class);
        runner.join();
        Assertions.assertThat(this.testAsyncSnapshotCallable.getInvocationOrder()).containsExactly((Object[])new String[]{METHOD_CALL, METHOD_CANCEL, METHOD_CLEANUP});
        Assertions.assertThat((boolean)this.testProvidedResource.isClosed()).isTrue();
        Assertions.assertThat((boolean)this.testBlocker.isClosed()).isTrue();
    }

    @Test
    void testCloseRun() throws Exception {
        Thread runner = this.startTask(this.task);
        while (this.testBlocker.getWaitersCount() < 1) {
            Thread.sleep(1L);
        }
        this.ownerRegistry.close();
        Assertions.assertThatThrownBy(this.task::get).isInstanceOf(CancellationException.class);
        runner.join();
        Assertions.assertThat(this.testAsyncSnapshotCallable.getInvocationOrder()).containsExactly((Object[])new String[]{METHOD_CALL, METHOD_CANCEL, METHOD_CLEANUP});
        Assertions.assertThat((boolean)this.testBlocker.isClosed()).isTrue();
    }

    @Test
    void testCancelBeforeRun() throws Exception {
        this.task.cancel(true);
        Thread runner = this.startTask(this.task);
        Assertions.assertThatThrownBy(this.task::get).isInstanceOf(CancellationException.class);
        runner.join();
        Assertions.assertThat(this.testAsyncSnapshotCallable.getInvocationOrder()).containsExactly((Object[])new String[]{METHOD_CANCEL, METHOD_CLEANUP});
        Assertions.assertThat((boolean)this.testProvidedResource.isClosed()).isTrue();
    }

    private Thread startTask(Runnable task) {
        Thread runner = new Thread(task);
        runner.start();
        return runner;
    }

    private static class TestBlockingCloseable
    implements Closeable {
        private final OneShotLatch blockerLatch = new OneShotLatch();
        private boolean closed = false;
        private boolean unblocked = false;
        private boolean exceptionally = false;

        private TestBlockingCloseable() {
        }

        public void simulateBlockingOperation() throws IOException {
            while (!this.unblocked) {
                try {
                    this.blockerLatch.await();
                }
                catch (InterruptedException e) {
                    this.blockerLatch.reset();
                }
            }
            if (this.exceptionally) {
                throw new IOException("Closed in block");
            }
        }

        @Override
        public void close() {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0);
            this.closed = true;
            this.unblockExceptionally();
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void unblockExceptionally() {
            this.introduceException();
            this.unblock();
        }

        public void unblockSuccessfully() {
            this.unblock();
        }

        private void unblock() {
            this.unblocked = true;
            this.blockerLatch.trigger();
        }

        public void introduceException() {
            this.exceptionally = true;
        }

        public int getWaitersCount() {
            return this.blockerLatch.getWaitersCount();
        }
    }

    private static class TestAsyncSnapshotCallable
    extends AsyncSnapshotCallable<String> {
        @Nonnull
        private final TestBlockingCloseable providedResource;
        @Nonnull
        private final TestBlockingCloseable blockingResource;
        @Nonnull
        private final List<String> invocationOrder;

        TestAsyncSnapshotCallable(@Nonnull TestBlockingCloseable providedResource, @Nonnull TestBlockingCloseable blockingResource) {
            this.providedResource = providedResource;
            this.blockingResource = blockingResource;
            this.invocationOrder = new ArrayList<String>();
        }

        protected String callInternal() throws Exception {
            this.addInvocation(AsyncSnapshotCallableTest.METHOD_CALL);
            this.snapshotCloseableRegistry.registerCloseable((AutoCloseable)this.blockingResource);
            try {
                this.blockingResource.simulateBlockingOperation();
            }
            finally {
                if (this.snapshotCloseableRegistry.unregisterCloseable((AutoCloseable)this.blockingResource)) {
                    this.blockingResource.close();
                }
            }
            return AsyncSnapshotCallableTest.SUCCESS;
        }

        protected void cleanupProvidedResources() {
            this.addInvocation(AsyncSnapshotCallableTest.METHOD_CLEANUP);
            this.providedResource.close();
        }

        protected void logAsyncSnapshotComplete(long startTime) {
            this.invocationOrder.add(AsyncSnapshotCallableTest.METHOD_LOG);
        }

        protected void cancel() {
            this.addInvocation(AsyncSnapshotCallableTest.METHOD_CANCEL);
            super.cancel();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        public List<String> getInvocationOrder() {
            List<String> list = this.invocationOrder;
            synchronized (list) {
                return new ArrayList<String>(this.invocationOrder);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addInvocation(@Nonnull String invocation) {
            List<String> list = this.invocationOrder;
            synchronized (list) {
                this.invocationOrder.add(invocation);
            }
        }
    }
}

