package org.apache.flink.runtime.taskexecutor;

import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.TestingJobServices;
import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobTableTest.class */
public class DefaultJobTableTest extends TestLogger {
    private static final SupplierWithException<JobTable.JobServices, RuntimeException> DEFAULT_JOB_SERVICES_SUPPLIER = () -> {
        return TestingJobServices.newBuilder().build();
    };
    private final JobID jobId = new JobID();
    private DefaultJobTable jobTable;

    @Before
    public void setup() {
        this.jobTable = DefaultJobTable.create();
    }

    @After
    public void teardown() {
        if (this.jobTable != null) {
            this.jobTable.close();
        }
    }

    @Test
    public void getOrCreateJob_NoRegisteredJob_WillCreateNewJob() {
        Assert.assertThat(this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER).getJobId(), Matchers.is(this.jobId));
        Assert.assertTrue(this.jobTable.getJob(this.jobId).isPresent());
    }

    @Test
    public void getOrCreateJob_RegisteredJob_WillReturnRegisteredJob() {
        Assert.assertThat(this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER), Matchers.sameInstance(this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER)));
    }

    @Test
    public void closeJob_WillCloseJobServices() throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingJobServices.Builder newBuilder = TestingJobServices.newBuilder();
        oneShotLatch.getClass();
        TestingJobServices build = newBuilder.setCloseRunnable(oneShotLatch::trigger).build();
        this.jobTable.getOrCreateJob(this.jobId, () -> {
            return build;
        }).close();
        oneShotLatch.await();
    }

    @Test
    public void closeJob_WillRemoveItFromJobTable() {
        this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER).close();
        Assert.assertFalse(this.jobTable.getJob(this.jobId).isPresent());
    }

    @Test
    public void connectJob_NotConnected_Succeeds() {
        JobTable.Job orCreateJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        ResourceID generate = ResourceID.generate();
        JobTable.Connection connectJob = connectJob(orCreateJob, generate);
        Assert.assertThat(connectJob.getJobId(), Matchers.is(this.jobId));
        Assert.assertThat(connectJob.getResourceId(), Matchers.is(generate));
        Assert.assertTrue(this.jobTable.getConnection(this.jobId).isPresent());
        Assert.assertTrue(this.jobTable.getConnection(generate).isPresent());
    }

    private JobTable.Connection connectJob(JobTable.Job job, ResourceID resourceID) {
        return job.connect(resourceID, new TestingJobMasterGatewayBuilder().build(), new NoOpTaskManagerActions(), NoOpCheckpointResponder.INSTANCE, new TestGlobalAggregateManager(), new NoOpResultPartitionConsumableNotifier(), new NoOpPartitionProducerStateChecker());
    }

    @Test(expected = IllegalStateException.class)
    public void connectJob_Connected_Fails() {
        JobTable.Job orCreateJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        connectJob(orCreateJob, ResourceID.generate());
        connectJob(orCreateJob, ResourceID.generate());
    }

    @Test
    public void disconnectConnection_RemovesConnection() {
        JobTable.Job orCreateJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        ResourceID generate = ResourceID.generate();
        connectJob(orCreateJob, generate).disconnect();
        Assert.assertFalse(this.jobTable.getConnection(this.jobId).isPresent());
        Assert.assertFalse(this.jobTable.getConnection(generate).isPresent());
    }

    @Test(expected = IllegalStateException.class)
    public void access_AfterBeingClosed_WillFail() {
        JobTable.Job orCreateJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        orCreateJob.close();
        orCreateJob.asConnection();
    }

    @Test(expected = IllegalStateException.class)
    public void connectJob_AfterBeingClosed_WillFail() {
        JobTable.Job orCreateJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        orCreateJob.close();
        connectJob(orCreateJob, ResourceID.generate());
    }

    @Test(expected = IllegalStateException.class)
    public void accessJobManagerGateway_AfterBeingDisconnected_WillFail() {
        JobTable.Connection connectJob = connectJob(this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER), ResourceID.generate());
        connectJob.disconnect();
        connectJob.getJobManagerGateway();
    }

    @Test
    public void close_WillCloseAllRegisteredJobs() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        TestingJobServices.Builder newBuilder = TestingJobServices.newBuilder();
        countDownLatch.getClass();
        TestingJobServices build = newBuilder.setCloseRunnable(countDownLatch::countDown).build();
        TestingJobServices.Builder newBuilder2 = TestingJobServices.newBuilder();
        countDownLatch.getClass();
        TestingJobServices build2 = newBuilder2.setCloseRunnable(countDownLatch::countDown).build();
        this.jobTable.getOrCreateJob(this.jobId, () -> {
            return build;
        });
        this.jobTable.getOrCreateJob(new JobID(), () -> {
            return build2;
        });
        this.jobTable.close();
        countDownLatch.await();
        Assert.assertTrue(this.jobTable.isEmpty());
    }
}
