package org.apache.flink.runtime.jobmanager;

import java.util.Arrays;
import java.util.BitSet;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.class */
public class SlotCountExceedingParallelismTest extends TestLogger {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getFlinkConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest$RoundRobinSubtaskIndexSender.class */
    public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-times-to-send";

        public RoundRobinSubtaskIndexSender(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            int integer = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
            IntValue intValue = new IntValue(getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            for (int i = 0; i < integer; i++) {
                try {
                    build.emit(intValue);
                } catch (Throwable th) {
                    build.close();
                    throw th;
                }
            }
            build.flushAll();
            build.close();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest$SubtaskIndexReceiver.class */
    public static class SubtaskIndexReceiver extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-indexes-to-receive";

        public SubtaskIndexReceiver(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordReader recordReader = new RecordReader(getEnvironment().getInputGate(0), IntValue.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
            try {
                int integer = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
                BitSet bitSet = new BitSet(integer);
                int i = 0;
                while (true) {
                    IntValue next = recordReader.next();
                    if (next == null) {
                        if (bitSet.cardinality() != integer) {
                            throw new IllegalStateException("Finished receive, but did not receive all expected subtask indexes.");
                        }
                        return;
                    }
                    i++;
                    if (i > integer) {
                        throw new IllegalStateException("Received more records than expected.");
                    }
                    int value = next.getValue();
                    if (bitSet.get(value)) {
                        throw new IllegalStateException("Received expected subtask index twice.");
                    }
                    bitSet.set(value, true);
                }
            } finally {
                recordReader.clearBuffers();
            }
        }
    }

    private static Configuration getFlinkConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT);
        return configuration;
    }

    @Test
    public void testNoSlotSharingAndBlockingResultSender() throws Exception {
        submitJobGraphAndWait(createTestJobGraph(JOB_NAME, 8, 4));
    }

    @Test
    public void testNoSlotSharingAndBlockingResultReceiver() throws Exception {
        submitJobGraphAndWait(createTestJobGraph(JOB_NAME, 4, 8));
    }

    @Test
    public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
        submitJobGraphAndWait(createTestJobGraph(JOB_NAME, 8, 8));
    }

    private void submitJobGraphAndWait(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
    }

    private JobGraph createTestJobGraph(String str, int i, int i2) {
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setInvokableClass(RoundRobinSubtaskIndexSender.class);
        jobVertex.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, i2);
        jobVertex.setParallelism(i);
        JobVertex jobVertex2 = new JobVertex("Receiver");
        jobVertex2.setInvokableClass(SubtaskIndexReceiver.class);
        jobVertex2.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, i);
        jobVertex2.setParallelism(i2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphBuilder.newBatchJobGraphBuilder().setJobName(str).addJobVertices(Arrays.asList(jobVertex, jobVertex2)).build();
    }
}
