package org.apache.flink.runtime.jobmaster;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.class */
public class JobMasterQueryableStateTest extends TestLogger {
    private static TestingRpcService rpcService;
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX_2;
    private static final JobGraph JOB_GRAPH;
    private static final Time testingTimeout = Time.seconds(10);
    private static final JobVertex JOB_VERTEX_1 = new JobVertex("v1");

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @After
    public void teardown() throws Exception {
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testRequestKvStateWithoutRegistration() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        Throwable th = null;
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
            }).satisfies(FlinkAssertions.anyCauseMatches(UnknownKvStateLocation.class));
            if (createJobMaster != null) {
                if (0 == 0) {
                    createJobMaster.close();
                    return;
                }
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                if (0 != 0) {
                    try {
                        createJobMaster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createJobMaster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRequestKvStateOfWrongJob() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        Throwable th = null;
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
            }).satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class));
            if (createJobMaster != null) {
                if (0 == 0) {
                    createJobMaster.close();
                    return;
                }
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                if (0 != 0) {
                    try {
                        createJobMaster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createJobMaster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        Throwable th = null;
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
                registerKvState(selfGateway, new JobID(), new JobVertexID(), "any-name");
            }).satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class));
            if (createJobMaster != null) {
                if (0 == 0) {
                    createJobMaster.close();
                    return;
                }
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                if (0 != 0) {
                    try {
                        createJobMaster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createJobMaster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRegisterKvState() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        Throwable th = null;
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            selfGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, inetSocketAddress).get();
            KvStateLocation kvStateLocation = (KvStateLocation) selfGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
            Assert.assertEquals(JOB_GRAPH.getJobID(), kvStateLocation.getJobId());
            Assert.assertEquals(JOB_VERTEX_1.getID(), kvStateLocation.getJobVertexId());
            Assert.assertEquals(JOB_VERTEX_1.getMaxParallelism(), kvStateLocation.getNumKeyGroups());
            Assert.assertEquals(1L, kvStateLocation.getNumRegisteredKeyGroups());
            Assert.assertEquals(1L, keyGroupRange.getNumberOfKeyGroups());
            Assert.assertEquals(kvStateID, kvStateLocation.getKvStateID(keyGroupRange.getStartKeyGroup()));
            Assert.assertEquals(inetSocketAddress, kvStateLocation.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
            if (createJobMaster != null) {
                if (0 == 0) {
                    createJobMaster.close();
                    return;
                }
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                if (0 != 0) {
                    try {
                        createJobMaster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createJobMaster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testUnregisterKvState() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        Throwable th = null;
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            selfGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, new InetSocketAddress(InetAddress.getLocalHost(), 1029)).get();
            selfGateway.notifyKvStateUnregistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me").get();
            try {
                selfGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
                Assert.fail("Expected to fail with an UnknownKvStateLocation.");
            } catch (Exception e) {
                Assert.assertThat(e, FlinkMatchers.containsCause(UnknownKvStateLocation.class));
            }
            if (createJobMaster != null) {
                if (0 == 0) {
                    createJobMaster.close();
                    return;
                }
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                if (0 != 0) {
                    try {
                        createJobMaster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createJobMaster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        Throwable th = null;
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            registerKvState(selfGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), "duplicate-me");
            try {
                registerKvState(selfGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_2.getID(), "duplicate-me");
                Assert.fail("Expected to fail because of clashing registration message.");
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Registration name clash").isPresent());
                Assert.assertThat(selfGateway.requestJobStatus(testingTimeout).get(), CoreMatchers.either(CoreMatchers.is(JobStatus.FAILED)).or(CoreMatchers.is(JobStatus.FAILING)));
            }
            if (createJobMaster != null) {
                if (0 == 0) {
                    createJobMaster.close();
                    return;
                }
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                if (0 != 0) {
                    try {
                        createJobMaster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createJobMaster.close();
                }
            }
            throw th3;
        }
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobID) throws ExecutionException, InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobID, 4, new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway(), testingTimeout);
        oneShotLatch.await();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void registerKvState(KvStateRegistryGateway kvStateRegistryGateway, JobID jobID, JobVertexID jobVertexID, String str) throws UnknownHostException, ExecutionException, InterruptedException {
        kvStateRegistryGateway.notifyKvStateRegistered(jobID, jobVertexID, new KeyGroupRange(0, 0), str, new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
    }

    static {
        JOB_VERTEX_1.setParallelism(4);
        JOB_VERTEX_1.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_2 = new JobVertex("v2");
        JOB_VERTEX_2.setParallelism(4);
        JOB_VERTEX_2.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_1.setMaxParallelism(16);
        JOB_VERTEX_2.setMaxParallelism(16);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JOB_VERTEX_1.setSlotSharingGroup(slotSharingGroup);
        JOB_VERTEX_2.setSlotSharingGroup(slotSharingGroup);
        JOB_GRAPH = JobGraphTestUtils.streamingJobGraph(JOB_VERTEX_1, JOB_VERTEX_2);
        JOB_GRAPH.setJobType(JobType.STREAMING);
    }
}
