package io.trino.tests;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.trino.Session;
import io.trino.execution.QueryManager;
import io.trino.server.testing.TestingTrinoServer;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResultWithQueryId;
import io.trino.testing.TestingSession;
import io.trino.tests.tpch.TpchQueryRunnerBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/tests/TestMinWorkerRequirement.class */
public class TestMinWorkerRequirement {
    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "Insufficient active worker nodes. Waited 1.00ns for at least 5 workers, but only 4 workers are active")
    public void testInsufficientWorkerNodes() throws Exception {
        DistributedQueryRunner build = ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setCoordinatorProperties(ImmutableMap.builder().put("query-manager.required-workers", "5").put("query-manager.required-workers-max-wait", "1ns").buildOrThrow())).setNodeCount(4)).build();
        try {
            build.execute("SELECT COUNT(*) from lineitem");
            Assert.fail("Expected exception due to insufficient active worker nodes");
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "Insufficient active worker nodes. Waited 1.00ns for at least 4 workers, but only 3 workers are active")
    public void testInsufficientWorkerNodesWithCoordinatorExcluded() throws Exception {
        DistributedQueryRunner build = ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setCoordinatorProperties(ImmutableMap.builder().put("node-scheduler.include-coordinator", "false").put("query-manager.required-workers", "4").put("query-manager.required-workers-max-wait", "1ns").buildOrThrow())).setNodeCount(4)).build();
        try {
            build.execute("SELECT COUNT(*) from lineitem");
            Assert.fail("Expected exception due to insufficient active worker nodes");
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testInsufficientWorkerNodesInternalSystemQuery() throws Exception {
        DistributedQueryRunner build = ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setCoordinatorProperties(ImmutableMap.builder().put("query-manager.required-workers", "5").put("query-manager.required-workers-max-wait", "1ns").buildOrThrow())).setNodeCount(4)).build();
        try {
            build.execute("SELECT 1");
            build.execute("DESCRIBE lineitem");
            build.execute("SHOW TABLES");
            build.execute("SHOW SCHEMAS");
            build.execute("SHOW CATALOGS");
            build.execute("SET SESSION required_workers_count=5");
            build.execute("SELECT * from system.runtime.nodes");
            build.execute("EXPLAIN SELECT count(*) from lineitem");
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testInsufficientWorkerNodesAfterDrop() throws Exception {
        DistributedQueryRunner build = ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setCoordinatorProperties(ImmutableMap.builder().put("query-manager.required-workers", "4").put("query-manager.required-workers-max-wait", "1ns").buildOrThrow())).setNodeCount(4)).build();
        try {
            build.execute("SELECT COUNT(*) from lineitem");
            Assert.assertEquals(build.getCoordinator().refreshNodes().getActiveNodes().size(), 4);
            ((TestingTrinoServer) build.getServers().get(0)).close();
            Assert.assertEquals(build.getCoordinator().refreshNodes().getActiveNodes().size(), 3);
            Assertions.assertThatThrownBy(() -> {
                build.execute("SELECT COUNT(*) from lineitem");
            }).isInstanceOf(RuntimeException.class).hasMessage("Insufficient active worker nodes. Waited 1.00ns for at least 4 workers, but only 3 workers are active");
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "Insufficient active worker nodes. Waited 99.00ns for at least 3 workers, but only 2 workers are active")
    public void testRequiredNodesMaxWaitSessionOverride() throws Exception {
        DistributedQueryRunner build = ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setCoordinatorProperties(ImmutableMap.builder().put("query-manager.required-workers", "3").put("query-manager.required-workers-max-wait", "1ns").buildOrThrow())).setNodeCount(2)).build();
        try {
            build.execute(TestingSession.testSessionBuilder().setSystemProperty("required_workers_count", "3").setSystemProperty("required_workers_max_wait_time", "99ns").setCatalog("tpch").setSchema("tiny").build(), "SELECT COUNT(*) from lineitem");
            Assert.fail("Expected exception due to insufficient active worker nodes");
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testRequiredWorkerNodesSessionOverride() throws Exception {
        DistributedQueryRunner build = ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setCoordinatorProperties(ImmutableMap.builder().put("query-manager.required-workers", "5").put("query-manager.required-workers-max-wait", "1ns").buildOrThrow())).setNodeCount(4)).build();
        try {
            Session build2 = TestingSession.testSessionBuilder().setSystemProperty("required_workers_count", "4").setCatalog("tpch").setSchema("tiny").build();
            build.execute(build2, "SELECT COUNT(*) from lineitem");
            Session build3 = Session.builder(build2).setSystemProperty("required_workers_count", "6").build();
            Assertions.assertThatThrownBy(() -> {
                build.execute(build3, "SELECT COUNT(*) from lineitem");
            }).isInstanceOf(RuntimeException.class).hasMessage("Insufficient active worker nodes. Waited 1.00ns for at least 6 workers, but only 4 workers are active");
            build.addServers(2);
            Assert.assertEquals(build.getCoordinator().refreshNodes().getActiveNodes().size(), 6);
            build.execute(build3, "SELECT COUNT(*) from lineitem");
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testMultipleRequiredWorkerNodesSessionOverride() throws Exception {
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
        try {
            DistributedQueryRunner build = ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().setNodeCount(1)).build();
            try {
                Session build2 = TestingSession.testSessionBuilder().setSystemProperty("required_workers_count", "2").setCatalog("tpch").setSchema("tiny").build();
                ListenableFuture submit = listeningDecorator.submit(() -> {
                    return build.executeWithQueryId(build2, "SELECT COUNT(*) from lineitem");
                });
                Session build3 = Session.builder(build2).setSystemProperty("required_workers_count", "3").build();
                ListenableFuture submit2 = listeningDecorator.submit(() -> {
                    return build.executeWithQueryId(build3, "SELECT COUNT(*) from lineitem");
                });
                Session build4 = Session.builder(build2).setSystemProperty("required_workers_count", "4").build();
                ListenableFuture submit3 = listeningDecorator.submit(() -> {
                    return build.executeWithQueryId(build4, "SELECT COUNT(*) from lineitem");
                });
                TimeUnit.MILLISECONDS.sleep(1000L);
                Assert.assertFalse(submit.isDone());
                Assert.assertFalse(submit2.isDone());
                Assert.assertFalse(submit3.isDone());
                build.addServers(1);
                Assert.assertEquals(build.getCoordinator().refreshNodes().getActiveNodes().size(), 2);
                TimeUnit.MILLISECONDS.sleep(1000L);
                Assert.assertTrue(((MaterializedResultWithQueryId) submit.get()).getResult().getRowCount() > 0);
                QueryManager queryManager = build.getCoordinator().getQueryManager();
                Assert.assertTrue(queryManager.getFullQueryInfo(((MaterializedResultWithQueryId) submit.get()).getQueryId()).getQueryStats().getResourceWaitingTime().roundTo(TimeUnit.SECONDS) >= 1);
                Assert.assertFalse(submit2.isDone());
                Assert.assertFalse(submit3.isDone());
                build.addServers(2);
                Assert.assertEquals(build.getCoordinator().refreshNodes().getActiveNodes().size(), 4);
                Assert.assertTrue(((MaterializedResultWithQueryId) submit2.get()).getResult().getRowCount() > 0);
                Assert.assertTrue(queryManager.getFullQueryInfo(((MaterializedResultWithQueryId) submit2.get()).getQueryId()).getQueryStats().getResourceWaitingTime().roundTo(TimeUnit.SECONDS) >= 2);
                Assert.assertTrue(((MaterializedResultWithQueryId) submit3.get()).getResult().getRowCount() > 0);
                Assert.assertTrue(queryManager.getFullQueryInfo(((MaterializedResultWithQueryId) submit3.get()).getQueryId()).getQueryStats().getResourceWaitingTime().roundTo(TimeUnit.SECONDS) >= 2);
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } finally {
            listeningDecorator.shutdown();
        }
    }
}
