/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service.operation;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.internal.StaticResultProvider;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.ResultSetImpl;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlCancelException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class OperationManagerTest {
    private static OperationManager operationManager;
    private static ResultSet defaultResultSet;
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION;

    OperationManagerTest() {
    }

    @BeforeEach
    void setUp() {
        operationManager = new OperationManager(EXECUTOR_EXTENSION.getExecutor());
        defaultResultSet = new ResultSetImpl(ResultSet.ResultType.PAYLOAD, Long.valueOf(1L), ResolvedSchema.of((Column[])new Column[]{Column.physical((String)"id", (DataType)DataTypes.BIGINT())}), Collections.singletonList(GenericRowData.of((Object[])new Object[]{1L})), StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER, false, null, ResultKind.SUCCESS_WITH_CONTENT);
    }

    @AfterEach
    void cleanEach() {
        operationManager.close();
    }

    @Test
    void testRunOperationAsynchronously() throws Exception {
        OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
        Assertions.assertThat((Comparable)operationManager.getOperationInfo(operationHandle).getStatus()).isNotEqualTo((Object)OperationStatus.ERROR);
        Assertions.assertThat((Object)operationManager.getOperationResultSchema(operationHandle)).isEqualTo((Object)ResolvedSchema.of((Column[])new Column[]{Column.physical((String)"id", (DataType)DataTypes.BIGINT())}));
        Assertions.assertThat((Comparable)operationManager.getOperationInfo(operationHandle).getStatus()).isEqualTo((Object)OperationStatus.FINISHED);
    }

    @Test
    void testRunOperationSynchronously() throws Exception {
        OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
        operationManager.awaitOperationTermination(operationHandle);
        Assertions.assertThat((Comparable)operationManager.getOperationInfo(operationHandle).getStatus()).isEqualTo((Object)OperationStatus.FINISHED);
        Assertions.assertThat((Object)operationManager.fetchResults(operationHandle, 0L, Integer.MAX_VALUE)).isEqualTo((Object)defaultResultSet);
    }

    @Test
    void testCancelOperation() throws Exception {
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = operationManager.submitOperation(() -> {
            endRunningLatch.await();
            return defaultResultSet;
        });
        EXECUTOR_EXTENSION.getExecutor().submit(() -> operationManager.cancelOperation(operationHandle));
        operationManager.awaitOperationTermination(operationHandle);
        Assertions.assertThat((Comparable)operationManager.getOperationInfo(operationHandle).getStatus()).isEqualTo((Object)OperationStatus.CANCELED);
    }

    @Test
    void testCancelUninterruptedOperation() throws Exception {
        AtomicReference<Boolean> isRunning = new AtomicReference<Boolean>(false);
        OperationHandle operationHandle = operationManager.submitOperation(() -> {
            while (true) {
                isRunning.compareAndSet(false, true);
            }
        });
        CommonTestUtils.waitUtil(isRunning::get, (Duration)Duration.ofSeconds(10L), (String)"Failed to start up the task.");
        Assertions.assertThatThrownBy(() -> operationManager.cancelOperation(operationHandle)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlCancelException.class, (String)String.format("Operation '%s' did not react to \"Future.cancel(true)\" and is stuck for %s seconds in method.\n", operationHandle, 5))});
        Assertions.assertThat((Comparable)operationManager.getOperationInfo(operationHandle).getStatus()).isEqualTo((Object)OperationStatus.CANCELED);
    }

    @Test
    void testCloseUninterruptedOperation() throws Exception {
        AtomicReference<Boolean> isRunning = new AtomicReference<Boolean>(false);
        for (int i = 0; i < 10; ++i) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> operationManager.submitOperation(() -> {
                while (true) {
                    isRunning.compareAndSet(false, true);
                }
            }));
        }
        CommonTestUtils.waitUtil(isRunning::get, (Duration)Duration.ofSeconds(10L), (String)"Failed to start up the task.");
        Assertions.assertThatThrownBy(() -> operationManager.close()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlCancelException.class)});
        Assertions.assertThat((int)operationManager.getOperationCount()).isEqualTo(0);
    }

    @Test
    void testCloseOperation() {
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = operationManager.submitOperation(() -> {
            endRunningLatch.await();
            return defaultResultSet;
        });
        EXECUTOR_EXTENSION.getExecutor().submit(() -> operationManager.closeOperation(operationHandle));
        Assertions.assertThatThrownBy(() -> {
            operationManager.awaitOperationTermination(operationHandle);
            operationManager.getOperation(operationHandle);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlGatewayException.class, (String)String.format("Can not find the submitted operation in the OperationManager with the %s.", operationHandle))});
    }

    @Test
    void testRunOperationSynchronouslyWithError() {
        OperationHandle operationHandle = operationManager.submitOperation(() -> {
            throw new SqlExecutionException("Execution error.");
        });
        Assertions.assertThatThrownBy(() -> operationManager.awaitOperationTermination(operationHandle)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlExecutionException.class, (String)"Execution error.")});
        Assertions.assertThat((Comparable)operationManager.getOperationInfo(operationHandle).getStatus()).isEqualTo((Object)OperationStatus.ERROR);
    }

    static {
        EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("SqlGatewayService Test Pool", (Thread.UncaughtExceptionHandler)IgnoreExceptionHandler.INSTANCE)));
    }
}

