/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;

public class ChannelStateWriteRequestExecutorFactoryTest {
    private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage();

    @Test
    void testReuseExecutorForSameJobId() throws IOException {
        this.assertReuseExecutor(1);
        this.assertReuseExecutor(2);
        this.assertReuseExecutor(3);
        this.assertReuseExecutor(5);
        this.assertReuseExecutor(10);
    }

    private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) throws IOException {
        JobID JOB_ID = new JobID();
        Random RANDOM = new Random();
        ChannelStateWriteRequestExecutorFactory executorFactory = new ChannelStateWriteRequestExecutorFactory(JOB_ID);
        int numberOfTasks = 100;
        ChannelStateWriteRequestExecutor currentExecutor = null;
        for (int i = 0; i < numberOfTasks; ++i) {
            ChannelStateWriteRequestExecutor newExecutor = executorFactory.getOrCreateExecutor(new JobVertexID(), RANDOM.nextInt(numberOfTasks), () -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID), maxSubtasksPerChannelStateFile);
            if (i % maxSubtasksPerChannelStateFile == 0) {
                ((ObjectAssert)Assertions.assertThat((Object)newExecutor).as("Factory should create the new executor.", new Object[0])).isNotSameAs(currentExecutor);
                currentExecutor = newExecutor;
                continue;
            }
            ((ObjectAssert)Assertions.assertThat((Object)newExecutor).as("Factory should reuse the old executor.", new Object[0])).isSameAs(currentExecutor);
        }
    }

    @Test
    void testSomeSubtasksCloseDuringOtherSubtasksStarting() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        int numberOfSubtask = 100000;
        int maxSubtasksPerChannelStateFile = 10;
        ChannelStateWriteRequestExecutorFactory executorFactory = new ChannelStateWriteRequestExecutorFactory(jobID);
        LinkedBlockingQueue queue = new LinkedBlockingQueue(100);
        CompletableFuture createFuture = new CompletableFuture();
        new Thread(() -> {
            try {
                for (int i = 0; i < numberOfSubtask; ++i) {
                    ChannelStateWriteRequestExecutor executor = executorFactory.getOrCreateExecutor(jobVertexID, i, () -> CHECKPOINT_STORAGE.createCheckpointStorage(jobID), maxSubtasksPerChannelStateFile, false);
                    Assertions.assertThat((Object)executor).isNotNull();
                    queue.put(executor);
                }
                createFuture.complete(null);
            }
            catch (Throwable e) {
                createFuture.completeExceptionally(e);
            }
        }).start();
        CompletableFuture releaseFuture = new CompletableFuture();
        new Thread(() -> {
            try {
                for (int i = 0; i < numberOfSubtask; ++i) {
                    ChannelStateWriteRequestExecutor executor = (ChannelStateWriteRequestExecutor)queue.take();
                    executor.releaseSubtask(jobVertexID, numberOfSubtask);
                }
                releaseFuture.complete(null);
            }
            catch (Throwable e) {
                releaseFuture.completeExceptionally(e);
            }
        }).start();
        createFuture.get();
        releaseFuture.get();
    }
}

