package com.ververica.cdc.connectors.mysql.source;

import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.class */
public abstract class MySqlSourceTestBase extends TestLogger {
    protected static final int DEFAULT_PARALLELISM = 4;

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    protected static final Logger LOG = LoggerFactory.getLogger(MySqlSourceTestBase.class);
    protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);

    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase$FailoverPhase.class */
    protected enum FailoverPhase {
        SNAPSHOT,
        BINLOG,
        NEVER
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase$FailoverType.class */
    protected enum FailoverType {
        TM,
        JM,
        NONE
    }

    @BeforeClass
    public static void startContainers() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @AfterClass
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        MYSQL_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    protected static MySqlContainer createMySqlContainer(MySqlVersion mySqlVersion) {
        return createMySqlContainer(mySqlVersion, "docker/server-gtids/my.cnf");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static MySqlContainer createMySqlContainer(MySqlVersion mySqlVersion, String str) {
        return new MySqlContainer(mySqlVersion).withConfigurationOverride(str).withSetupSQL("docker/setup.sql").m21withDatabaseName("flink-test").m23withUsername("flinkuser").m22withPassword("flinkpw").withLogConsumer(new Slf4jLogConsumer(LOG));
    }

    public static void assertEqualsInAnyOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        assertEqualsInOrder((List) list.stream().sorted().collect(Collectors.toList()), (List) list2.stream().sorted().collect(Collectors.toList()));
    }

    public static void assertEqualsInOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        Assert.assertEquals(list.size(), list2.size());
        Assert.assertArrayEquals(list.toArray(new String[0]), list2.toArray(new String[0]));
    }

    public static void assertMapEquals(Map<String, ?> map, Map<String, ?> map2) {
        Assert.assertTrue((map == null || map2 == null) ? false : true);
        Assert.assertEquals(map.size(), map2.size());
        for (String str : map.keySet()) {
            Assert.assertEquals(map.get(str), map2.get(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void triggerFailover(FailoverType failoverType, JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        switch (failoverType) {
            case TM:
                restartTaskManager(miniCluster, runnable);
                return;
            case JM:
                triggerJobManagerFailover(jobID, miniCluster, runnable);
                return;
            case NONE:
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + failoverType);
        }
    }

    protected static void triggerJobManagerFailover(JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    protected static void restartTaskManager(MiniCluster miniCluster, Runnable runnable) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void waitForUpsertSinkSize(String str, int i) throws InterruptedException {
        while (upsertSinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    protected static int upsertSinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }
}
