package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({AlsoRunWithLegacyScheduler.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.class */
public class ScheduleOrUpdateConsumersTest extends TestLogger {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getFlinkConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest$BinaryRoundRobinSubtaskIndexSender.class */
    public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable {
        static final String CONFIG_KEY = "number-of-times-to-send";

        public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ArrayList<RecordWriter> newArrayListWithCapacity = Lists.newArrayListWithCapacity(2);
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            RecordWriter build2 = new RecordWriterBuilder().build(getEnvironment().getWriter(1));
            newArrayListWithCapacity.add(build);
            newArrayListWithCapacity.add(build2);
            int integer = getTaskConfiguration().getInteger("number-of-times-to-send", 0);
            IntValue intValue = new IntValue(getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            for (RecordWriter recordWriter : newArrayListWithCapacity) {
                for (int i = 0; i < integer; i++) {
                    try {
                        recordWriter.emit(intValue);
                    } catch (Throwable th) {
                        recordWriter.clearBuffers();
                        throw th;
                    }
                }
                recordWriter.flushAll();
                recordWriter.clearBuffers();
            }
        }
    }

    private static Configuration getFlinkConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        return configuration;
    }

    @Test
    public void testMixedPipelinedAndBlockingResults() throws Exception {
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
        jobVertex.getConfiguration().setInteger(SlotCountExceedingParallelismTest.RoundRobinSubtaskIndexSender.CONFIG_KEY, 4);
        jobVertex.setParallelism(4);
        JobVertex jobVertex2 = new JobVertex("Pipelined Receiver");
        jobVertex2.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
        jobVertex2.getConfiguration().setInteger(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY, 4);
        jobVertex2.setParallelism(4);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertex jobVertex3 = new JobVertex("Blocking Receiver");
        jobVertex3.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
        jobVertex3.getConfiguration().setInteger(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY, 4);
        jobVertex3.setParallelism(4);
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex3.setSlotSharingGroup(slotSharingGroup);
        MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(new JobGraph("Mixed pipelined and blocking result", new JobVertex[]{jobVertex, jobVertex2, jobVertex3}));
    }
}
