package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.util.ManualTicker;
import org.apache.flink.shaded.guava32.com.google.common.base.Ticker;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.class */
public class MemoryExecutionGraphInfoStoreTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Test
    public void testPut() throws IOException {
        assertPutJobGraphWithStatus(JobStatus.FINISHED);
    }

    @Test
    public void testPutSuspendedJob() throws IOException {
        assertPutJobGraphWithStatus(JobStatus.SUSPENDED);
    }

    @Test
    public void testUnknownGet() throws IOException {
        MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore = createMemoryExecutionGraphInfoStore();
        try {
            Assert.assertThat(createMemoryExecutionGraphInfoStore.get(new JobID()), Matchers.nullValue());
            if (createMemoryExecutionGraphInfoStore != null) {
                createMemoryExecutionGraphInfoStore.close();
            }
        } catch (Throwable th) {
            if (createMemoryExecutionGraphInfoStore != null) {
                try {
                    createMemoryExecutionGraphInfoStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStoredJobsOverview() throws IOException {
        Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos = ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos(10);
        JobsOverview create = JobsOverview.create((List) generateTerminalExecutionGraphInfos.stream().map((v0) -> {
            return v0.getArchivedExecutionGraph();
        }).map((v0) -> {
            return v0.getState();
        }).collect(Collectors.toList()));
        MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore = createMemoryExecutionGraphInfoStore();
        try {
            Iterator<ExecutionGraphInfo> it = generateTerminalExecutionGraphInfos.iterator();
            while (it.hasNext()) {
                createMemoryExecutionGraphInfoStore.put(it.next());
            }
            Assert.assertThat(createMemoryExecutionGraphInfoStore.getStoredJobsOverview(), Matchers.equalTo(create));
            if (createMemoryExecutionGraphInfoStore != null) {
                createMemoryExecutionGraphInfoStore.close();
            }
        } catch (Throwable th) {
            if (createMemoryExecutionGraphInfoStore != null) {
                try {
                    createMemoryExecutionGraphInfoStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAvailableJobDetails() throws IOException {
        Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos = ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos(10);
        Collection<JobDetails> generateJobDetails = ExecutionGraphInfoStoreTestUtils.generateJobDetails(generateTerminalExecutionGraphInfos);
        MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore = createMemoryExecutionGraphInfoStore();
        try {
            Iterator<ExecutionGraphInfo> it = generateTerminalExecutionGraphInfos.iterator();
            while (it.hasNext()) {
                createMemoryExecutionGraphInfoStore.put(it.next());
            }
            Assert.assertThat(createMemoryExecutionGraphInfoStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(generateJobDetails.toArray()));
            if (createMemoryExecutionGraphInfoStore != null) {
                createMemoryExecutionGraphInfoStore.close();
            }
        } catch (Throwable th) {
            if (createMemoryExecutionGraphInfoStore != null) {
                try {
                    createMemoryExecutionGraphInfoStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testExecutionGraphExpiration() throws Exception {
        Duration ofMillis = Duration.ofMillis(1L);
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        ManualTicker manualTicker = new ManualTicker();
        MemoryExecutionGraphInfoStore memoryExecutionGraphInfoStore = new MemoryExecutionGraphInfoStore(ofMillis, Integer.MAX_VALUE, manuallyTriggeredScheduledExecutor, manualTicker);
        try {
            ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
            memoryExecutionGraphInfoStore.put(executionGraphInfo);
            Assert.assertThat(Integer.valueOf(memoryExecutionGraphInfoStore.size()), Matchers.equalTo(1));
            manualTicker.advanceTime(ofMillis.toMillis(), TimeUnit.MILLISECONDS);
            manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
            Assert.assertThat(Integer.valueOf(memoryExecutionGraphInfoStore.size()), Matchers.equalTo(0));
            Assert.assertThat(memoryExecutionGraphInfoStore.get(executionGraphInfo.getJobId()), Matchers.nullValue());
            Assert.assertThat(Integer.valueOf(memoryExecutionGraphInfoStore.size()), Matchers.equalTo(0));
            memoryExecutionGraphInfoStore.close();
        } catch (Throwable th) {
            try {
                memoryExecutionGraphInfoStore.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCloseCleansUp() throws IOException {
        MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore = createMemoryExecutionGraphInfoStore();
        try {
            Assert.assertThat(Integer.valueOf(createMemoryExecutionGraphInfoStore.size()), Matchers.equalTo(0));
            createMemoryExecutionGraphInfoStore.put(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build()));
            Assert.assertThat(Integer.valueOf(createMemoryExecutionGraphInfoStore.size()), Matchers.equalTo(1));
            createMemoryExecutionGraphInfoStore.close();
            Assert.assertThat(Integer.valueOf(createMemoryExecutionGraphInfoStore.size()), Matchers.equalTo(0));
            if (createMemoryExecutionGraphInfoStore != null) {
                createMemoryExecutionGraphInfoStore.close();
            }
        } catch (Throwable th) {
            if (createMemoryExecutionGraphInfoStore != null) {
                try {
                    createMemoryExecutionGraphInfoStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testMaximumCapacity() throws IOException {
        Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos = ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos(10);
        Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos2 = ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos(10);
        Collection<JobDetails> generateJobDetails = ExecutionGraphInfoStoreTestUtils.generateJobDetails(generateTerminalExecutionGraphInfos2);
        MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore = createMemoryExecutionGraphInfoStore(Duration.ofHours(1L), 10);
        try {
            Iterator<ExecutionGraphInfo> it = generateTerminalExecutionGraphInfos.iterator();
            while (it.hasNext()) {
                createMemoryExecutionGraphInfoStore.put(it.next());
                Assert.assertTrue(createMemoryExecutionGraphInfoStore.size() <= 10);
            }
            Iterator<ExecutionGraphInfo> it2 = generateTerminalExecutionGraphInfos2.iterator();
            while (it2.hasNext()) {
                createMemoryExecutionGraphInfoStore.put(it2.next());
                Assert.assertEquals(10L, createMemoryExecutionGraphInfoStore.size());
            }
            Assert.assertThat(createMemoryExecutionGraphInfoStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(generateJobDetails.toArray()));
            if (createMemoryExecutionGraphInfoStore != null) {
                createMemoryExecutionGraphInfoStore.close();
            }
        } catch (Throwable th) {
            if (createMemoryExecutionGraphInfoStore != null) {
                try {
                    createMemoryExecutionGraphInfoStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException {
        ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(jobStatus).build());
        MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore = createMemoryExecutionGraphInfoStore();
        try {
            Assert.assertThat(Integer.valueOf(createMemoryExecutionGraphInfoStore.size()), Matchers.equalTo(0));
            createMemoryExecutionGraphInfoStore.put(executionGraphInfo);
            Assert.assertThat(Integer.valueOf(createMemoryExecutionGraphInfoStore.size()), Matchers.equalTo(1));
            Assert.assertThat(createMemoryExecutionGraphInfoStore.get(executionGraphInfo.getJobId()), new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher(executionGraphInfo));
            if (createMemoryExecutionGraphInfoStore != null) {
                createMemoryExecutionGraphInfoStore.close();
            }
        } catch (Throwable th) {
            if (createMemoryExecutionGraphInfoStore != null) {
                try {
                    createMemoryExecutionGraphInfoStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore() {
        return new MemoryExecutionGraphInfoStore();
    }

    private MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore(Duration duration, int i) {
        return new MemoryExecutionGraphInfoStore(duration, i, new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()), Ticker.systemTicker());
    }
}
