package org.apache.flink.runtime.jobmaster;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.class */
class JobMasterQueryableStateTest {
    private static final Duration testingTimeout;
    private static TestingRpcService rpcService;
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX_1;
    private static final JobVertex JOB_VERTEX_2;
    private static final JobGraph JOB_GRAPH;
    static final /* synthetic */ boolean $assertionsDisabled;

    JobMasterQueryableStateTest() {
    }

    @BeforeAll
    private static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @AfterEach
    private void teardown() throws Exception {
        rpcService.clearGateways();
    }

    @AfterAll
    private static void teardownClass() {
        if (rpcService != null) {
            rpcService.closeAsync();
            rpcService = null;
        }
    }

    @Test
    void testRequestKvStateWithoutRegistration() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
                selfGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown").get();
            }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(UnknownKvStateLocation.class)});
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRequestKvStateOfWrongJob() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
                selfGateway.requestKvStateLocation(new JobID(), "unknown").get();
            }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class)});
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
                registerKvState(selfGateway, new JobID(), new JobVertexID(), "any-name");
            }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class)});
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRegisterKvState() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            selfGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, inetSocketAddress).get();
            KvStateLocation kvStateLocation = (KvStateLocation) selfGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
            Assertions.assertThat(kvStateLocation.getJobId()).isEqualTo(JOB_GRAPH.getJobID());
            Assertions.assertThat(kvStateLocation.getJobVertexId()).isEqualTo(JOB_VERTEX_1.getID());
            Assertions.assertThat(kvStateLocation.getNumKeyGroups()).isEqualTo(JOB_VERTEX_1.getMaxParallelism());
            Assertions.assertThat(kvStateLocation.getNumRegisteredKeyGroups()).isOne();
            Assertions.assertThat(keyGroupRange.getNumberOfKeyGroups()).isOne();
            Assertions.assertThat(kvStateLocation.getKvStateID(keyGroupRange.getStartKeyGroup())).isEqualTo(kvStateID);
            Assertions.assertThat(kvStateLocation.getKvStateServerAddress(keyGroupRange.getStartKeyGroup())).isEqualTo(inetSocketAddress);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testUnregisterKvState() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            selfGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, new InetSocketAddress(InetAddress.getLocalHost(), 1029)).get();
            selfGateway.notifyKvStateUnregistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me").get();
            Assertions.assertThatThrownBy(() -> {
                selfGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
            }).as("Expected to fail with an UnknownKvStateLocation.", new Object[0]).isInstanceOf(Exception.class).hasCauseInstanceOf(UnknownKvStateLocation.class);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(selfGateway, JOB_GRAPH.getJobID());
            registerKvState(selfGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), "duplicate-me");
            Assertions.assertThatThrownBy(() -> {
                registerKvState(selfGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_2.getID(), "duplicate-me");
            }).as("Expected to fail because of clashing registration message.", new Object[0]).isInstanceOf(Exception.class).hasMessageContaining("Registration name clash");
            Assertions.assertThat((JobStatus) selfGateway.requestJobStatus(testingTimeout).get()).satisfies(new Consumer[]{jobStatus -> {
                if (!$assertionsDisabled && jobStatus != JobStatus.FAILED && jobStatus != JobStatus.FAILING) {
                    throw new AssertionError();
                }
            }});
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobID) throws ExecutionException, InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobID, 4, new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway(), testingTimeout);
        oneShotLatch.await();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void registerKvState(KvStateRegistryGateway kvStateRegistryGateway, JobID jobID, JobVertexID jobVertexID, String str) throws UnknownHostException, ExecutionException, InterruptedException {
        kvStateRegistryGateway.notifyKvStateRegistered(jobID, jobVertexID, new KeyGroupRange(0, 0), str, new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
    }

    static {
        $assertionsDisabled = !JobMasterQueryableStateTest.class.desiredAssertionStatus();
        testingTimeout = Duration.ofSeconds(10L);
        JOB_VERTEX_1 = new JobVertex("v1");
        JOB_VERTEX_1.setParallelism(4);
        JOB_VERTEX_1.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_2 = new JobVertex("v2");
        JOB_VERTEX_2.setParallelism(4);
        JOB_VERTEX_2.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_1.setMaxParallelism(16);
        JOB_VERTEX_2.setMaxParallelism(16);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JOB_VERTEX_1.setSlotSharingGroup(slotSharingGroup);
        JOB_VERTEX_2.setSlotSharingGroup(slotSharingGroup);
        JOB_GRAPH = JobGraphTestUtils.streamingJobGraph(JOB_VERTEX_1, JOB_VERTEX_2);
        JOB_GRAPH.setJobType(JobType.STREAMING);
    }
}
