package org.apache.flink.runtime.jobgraph;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.executiongraph.SimpleInitializeOnMasterContext;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest.class */
class JobTaskVertexTest {

    @RegisterExtension
    final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

    /* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest$TestClassLoader.class */
    private static class TestClassLoader extends URLClassLoader {
        public TestClassLoader() {
            super(new URL[0], Thread.currentThread().getContextClassLoader());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest$TestException.class */
    private static final class TestException extends IOException {
        private TestException() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest$TestInitializeOutputFormat.class */
    private static final class TestInitializeOutputFormat implements OutputFormat<Object>, InitializeOnMaster {
        private final SharedReference<AtomicInteger> globalParallelism;

        private TestInitializeOutputFormat(SharedReference<AtomicInteger> sharedReference) {
            this.globalParallelism = sharedReference;
        }

        public void configure(Configuration configuration) {
        }

        public void open(OutputFormat.InitializationContext initializationContext) throws IOException {
        }

        public void writeRecord(Object obj) throws IOException {
        }

        public void close() throws IOException {
        }

        public void initializeGlobal(int i) throws IOException {
            ((AtomicInteger) this.globalParallelism.get()).set(i);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest$TestInputFormat.class */
    private static final class TestInputFormat extends GenericInputFormat<Object> {
        private boolean isConfigured = false;
        private final Configuration expectedParameters;

        public TestInputFormat(Configuration configuration) {
            this.expectedParameters = configuration;
        }

        public boolean reachedEnd() {
            return false;
        }

        public Object nextRecord(Object obj) {
            return null;
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public GenericInputSplit[] m207createInputSplits(int i) {
            if (this.isConfigured) {
                return new GenericInputSplit[]{new TestSplit(0, 1)};
            }
            throw new IllegalStateException("InputFormat was not configured before createInputSplits was called.");
        }

        public void configure(Configuration configuration) {
            if (this.isConfigured) {
                throw new IllegalStateException("InputFormat is already configured.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            for (String str : this.expectedParameters.keySet()) {
                Assertions.assertThat(configuration.getString(str, (String) null)).isEqualTo(this.expectedParameters.getString(str, (String) null));
            }
            this.isConfigured = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest$TestSplit.class */
    public static final class TestSplit extends GenericInputSplit {
        public TestSplit(int i, int i2) {
            super(i, i2);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobgraph/JobTaskVertexTest$TestingOutputFormat.class */
    private static final class TestingOutputFormat implements InitializeOnMaster, FinalizeOnMaster, OutputFormat<Object> {
        private boolean isConfigured = false;
        private final Configuration expectedParameters;

        public TestingOutputFormat(Configuration configuration) {
            this.expectedParameters = configuration;
        }

        public void initializeGlobal(int i) throws IOException {
            if (!this.isConfigured) {
                throw new IllegalStateException("OutputFormat was not configured before initializeGlobal was called.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            throw new TestException();
        }

        public void finalizeGlobal(FinalizeOnMaster.FinalizationContext finalizationContext) throws IOException {
            if (!this.isConfigured) {
                throw new IllegalStateException("OutputFormat was not configured before finalizeGlobal was called.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            throw new TestException();
        }

        public void configure(Configuration configuration) {
            if (this.isConfigured) {
                throw new IllegalStateException("OutputFormat is already configured.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            for (String str : this.expectedParameters.keySet()) {
                Assertions.assertThat(configuration.getString(str, (String) null)).isEqualTo(this.expectedParameters.getString(str, (String) null));
            }
            this.isConfigured = true;
        }

        public void open(OutputFormat.InitializationContext initializationContext) throws IOException {
        }

        public void writeRecord(Object obj) throws IOException {
        }

        public void close() throws IOException {
        }
    }

    JobTaskVertexTest() {
    }

    @Test
    void testMultipleConsumersVertices() {
        JobVertex jobVertex = new JobVertex("producer");
        JobVertex jobVertex2 = new JobVertex("consumer1");
        JobVertex jobVertex3 = new JobVertex("consumer2");
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, intermediateDataSetID, false);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex3, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, intermediateDataSetID, false);
        JobVertex jobVertex4 = new JobVertex("consumer3");
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Assertions.assertThat(jobVertex.getProducedDataSets()).hasSize(2);
        IntermediateDataSet intermediateDataSet = (IntermediateDataSet) jobVertex.getProducedDataSets().get(0);
        Assertions.assertThat(intermediateDataSet.getId()).isEqualTo(intermediateDataSetID);
        List consumers = intermediateDataSet.getConsumers();
        Assertions.assertThat(consumers).hasSize(2);
        Assertions.assertThat(((JobEdge) consumers.get(0)).getTarget().getID()).isEqualTo(jobVertex2.getID());
        Assertions.assertThat(((JobEdge) consumers.get(1)).getTarget().getID()).isEqualTo(jobVertex3.getID());
        List consumers2 = ((IntermediateDataSet) jobVertex.getProducedDataSets().get(1)).getConsumers();
        Assertions.assertThat(consumers2).hasSize(1);
        Assertions.assertThat(((JobEdge) consumers2.get(0)).getTarget().getID()).isEqualTo(jobVertex4.getID());
    }

    @Test
    void testConnectDirectly() {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("target");
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        Assertions.assertThat(jobVertex.isInputVertex()).isTrue();
        Assertions.assertThat(jobVertex.isOutputVertex()).isFalse();
        Assertions.assertThat(jobVertex2.isInputVertex()).isFalse();
        Assertions.assertThat(jobVertex2.isOutputVertex()).isTrue();
        Assertions.assertThat(jobVertex.getNumberOfProducedIntermediateDataSets()).isEqualTo(1);
        Assertions.assertThat(jobVertex2.getNumberOfInputs()).isEqualTo(1);
        Assertions.assertThat((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).isEqualTo(((JobEdge) jobVertex2.getInputs().get(0)).getSource());
        Assertions.assertThat(((JobEdge) ((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getConsumers().get(0)).getTarget()).isEqualTo(jobVertex2);
    }

    @Test
    void testOutputFormat() throws Exception {
        InputOutputFormatVertex inputOutputFormatVertex = new InputOutputFormatVertex("Name");
        OperatorID operatorID = new OperatorID();
        Configuration configuration = new Configuration();
        configuration.setString("test_key", "test_value");
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addOutputFormat(operatorID, new TestingOutputFormat(configuration)).addParameters(operatorID, configuration).write(new TaskConfig(inputOutputFormatVertex.getConfiguration()));
        TestClassLoader testClassLoader = new TestClassLoader();
        Assertions.assertThatThrownBy(() -> {
            inputOutputFormatVertex.initializeOnMaster(new SimpleInitializeOnMasterContext(testClassLoader, inputOutputFormatVertex.getParallelism()));
        }).isInstanceOf(TestException.class);
        InputOutputFormatVertex clone = InstantiationUtil.clone(inputOutputFormatVertex);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Assertions.assertThatThrownBy(() -> {
            clone.initializeOnMaster(new SimpleInitializeOnMasterContext(testClassLoader, clone.getParallelism()));
        }).isInstanceOf(TestException.class);
        Assertions.assertThat(Thread.currentThread().getContextClassLoader()).as("Previous classloader was not restored.", new Object[0]).isEqualTo(contextClassLoader);
        Assertions.assertThatThrownBy(() -> {
            clone.finalizeOnMaster(new JobVertex.FinalizeOnMasterContext() { // from class: org.apache.flink.runtime.jobgraph.JobTaskVertexTest.1
                public ClassLoader getClassLoader() {
                    return testClassLoader;
                }

                public int getExecutionParallelism() {
                    return clone.getParallelism();
                }

                public int getFinishedAttempt(int i) {
                    return 0;
                }
            });
        }).isInstanceOf(TestException.class);
        Assertions.assertThat(Thread.currentThread().getContextClassLoader()).as("Previous classloader was not restored.", new Object[0]).isEqualTo(contextClassLoader);
    }

    @Test
    void testInputFormat() throws Exception {
        InputOutputFormatVertex inputOutputFormatVertex = new InputOutputFormatVertex("Name");
        OperatorID operatorID = new OperatorID();
        Configuration configuration = new Configuration();
        configuration.setString("test_key", "test_value");
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addInputFormat(operatorID, new TestInputFormat(configuration)).addParameters(operatorID, "test_key", "test_value").write(new TaskConfig(inputOutputFormatVertex.getConfiguration()));
        inputOutputFormatVertex.initializeOnMaster(new SimpleInitializeOnMasterContext(new TestClassLoader(), inputOutputFormatVertex.getParallelism()));
        InputSplit[] createInputSplits = inputOutputFormatVertex.getInputSplitSource().createInputSplits(77);
        Assertions.assertThat(createInputSplits).isNotNull();
        Assertions.assertThat(createInputSplits).hasSize(1);
        Assertions.assertThat(createInputSplits[0].getClass()).isEqualTo(TestSplit.class);
    }

    @Test
    void testOutputFormatUsesCorrectParallelism() throws Exception {
        InputOutputFormatVertex inputOutputFormatVertex = new InputOutputFormatVertex("Name");
        inputOutputFormatVertex.setParallelism(1);
        OperatorID operatorID = new OperatorID();
        SharedReference add = this.sharedObjects.add(new AtomicInteger());
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addOutputFormat(operatorID, new TestInitializeOutputFormat(add)).write(new TaskConfig(inputOutputFormatVertex.getConfiguration()));
        int i = 1 + 3;
        TestClassLoader testClassLoader = new TestClassLoader();
        try {
            inputOutputFormatVertex.initializeOnMaster(new SimpleInitializeOnMasterContext(testClassLoader, i));
            Assertions.assertThat(((AtomicInteger) add.get()).get()).isEqualTo(i);
            testClassLoader.close();
        } catch (Throwable th) {
            try {
                testClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
