package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.jobmaster.event.JobEventManager;
import org.apache.flink.runtime.jobmaster.event.JobEventStore;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterContextImpl;
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.class */
public class BatchJobRecoveryTest {

    @TempDir
    private Path temporaryFolder;
    protected EventReceivingTasks receivingTasks;
    private static final int NUM_SPLITS = 10;
    private static final int SOURCE_PARALLELISM = 5;
    private static final int MIDDLE_PARALLELISM = 5;
    private static final int DECIDED_SINK_PARALLELISM = 2;
    private SourceCoordinatorProvider<MockSourceSplit> provider;
    private FileSystemJobEventStore jobEventStore;
    private AtomicBoolean recoveryStarted;
    private List<JobEvent> persistedJobEventList;
    private byte[] serializedJobGraph;

    @Parameter
    public boolean enableSpeculativeExecution;

    @Parameter(1)
    public boolean isBlockingShuffle;

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    @RegisterExtension
    static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Extension();
    private static final OperatorID OPERATOR_ID = new OperatorID(1234, 5678);
    private static final JobVertexID SOURCE_ID = new JobVertexID();
    private static final JobVertexID MIDDLE_ID = new JobVertexID();
    private static final JobVertexID SINK_ID = new JobVertexID();
    private static final JobID JOB_ID = new JobID();
    private final Duration previousWorkerRecoveryTimeout = Duration.ofSeconds(1);
    private final TestingComponentMainThreadExecutor mainThreadExecutor = MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
    private ScheduledExecutor delayedExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    private final Collection<PartitionWithMetrics> allPartitionWithMetrics = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest$TestPartitionWithMetrics.class */
    public static class TestPartitionWithMetrics implements PartitionWithMetrics {
        private final ResultPartitionID resultPartitionID;
        private final ShuffleMetrics metrics;

        public TestPartitionWithMetrics(ResultPartitionID resultPartitionID, ShuffleMetrics shuffleMetrics) {
            this.resultPartitionID = resultPartitionID;
            this.metrics = shuffleMetrics;
        }

        public ShuffleMetrics getPartitionMetrics() {
            return this.metrics;
        }

        public ShuffleDescriptor getPartition() {
            return new NettyShuffleDescriptor(ResourceID.generate(), null, this.resultPartitionID) { // from class: org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryTest.TestPartitionWithMetrics.1
                public Optional<ResourceID> storesLocalResourcesOn() {
                    return Optional.empty();
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest$TestingFileSystemJobEventStore.class */
    private static class TestingFileSystemJobEventStore extends FileSystemJobEventStore {
        private final List<JobEvent> persistedJobEventList;
        private final AtomicBoolean recoveryStarted;

        public TestingFileSystemJobEventStore(org.apache.flink.core.fs.Path path, Configuration configuration, List<JobEvent> list, AtomicBoolean atomicBoolean) throws IOException {
            super(path, configuration);
            this.persistedJobEventList = list;
            this.recoveryStarted = atomicBoolean;
        }

        protected void writeEventRunnable(JobEvent jobEvent, boolean z) {
            super.writeEventRunnable(jobEvent, z);
            this.persistedJobEventList.add(jobEvent);
        }

        public JobEvent readEvent() throws Exception {
            this.recoveryStarted.compareAndSet(false, true);
            return super.readEvent();
        }
    }

    @Parameters(name = "enableSpeculativeExecution={0}, isBlockingShuffle={1}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{false, false}, new Object[]{false, true}, new Object[]{true, true}, new Object[]{true, false});
    }

    @BeforeEach
    void setUp() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath());
        this.delayedExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.persistedJobEventList = new ArrayList();
        this.recoveryStarted = new AtomicBoolean();
        this.jobEventStore = new TestingFileSystemJobEventStore(path, new Configuration(), this.persistedJobEventList, this.recoveryStarted);
        this.provider = new SourceCoordinatorProvider<>("AdaptiveBatchSchedulerTest", OPERATOR_ID, new MockSource(Boundedness.BOUNDED, 10), 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null);
        this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph());
        this.allPartitionWithMetrics.clear();
    }

    @AfterEach
    void after() {
        this.jobEventStore.stop(true);
    }

    @TestTemplate
    void testRecoverFromJMFailover() throws Exception {
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph(this.serializedJobGraph));
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SOURCE_ID);
        });
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.INITIALIZING, MIDDLE_ID);
            transitionExecutionsState(createScheduler, ExecutionState.RUNNING, MIDDLE_ID);
        });
        List<ExecutionAttemptID> currentAttemptIds = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SOURCE_ID));
        List<ExecutionAttemptID> currentAttemptIds2 = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(MIDDLE_ID));
        HashMap hashMap = new HashMap();
        Iterator<ExecutionVertex> it = getExecutionVertices(SOURCE_ID, createScheduler.getExecutionGraph()).iterator();
        while (it.hasNext()) {
            IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition) it.next().getProducedPartitions().values().iterator().next();
            hashMap.put(intermediateResultPartition.getPartitionId(), Integer.valueOf(intermediateResultPartition.getNumberOfSubpartitions()));
        }
        waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        runInMainThread(() -> {
            this.jobEventStore.stop(false);
        });
        registerPartitions(createScheduler);
        AdaptiveBatchScheduler createScheduler2 = createScheduler(deserializeJobGraph(this.serializedJobGraph));
        startSchedulingAndWaitRecoverFinish(createScheduler2);
        for (ExecutionVertex executionVertex : getExecutionVertices(SOURCE_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds).contains(new ExecutionAttemptID[]{executionVertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
            JobMasterPartitionTracker partitionTracker = createScheduler2.getExecutionGraph().getPartitionTracker();
            Stream stream = executionVertex.getProducedPartitions().keySet().stream();
            DefaultExecutionGraph executionGraph = createScheduler2.getExecutionGraph();
            Objects.requireNonNull(executionGraph);
            Iterator it2 = ((List) stream.map(executionGraph::createResultPartitionId).collect(Collectors.toList())).iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(partitionTracker.isPartitionTracked((ResultPartitionID) it2.next())).isTrue();
            }
            IntermediateResultPartition intermediateResultPartition2 = (IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next();
            Assertions.assertThat(intermediateResultPartition2.getNumberOfSubpartitions()).isEqualTo(hashMap.get(intermediateResultPartition2.getPartitionId()));
        }
        for (ExecutionVertex executionVertex2 : getExecutionVertices(MIDDLE_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds2).doesNotContain(new ExecutionAttemptID[]{executionVertex2.getCurrentExecutionAttempt().getAttemptId()});
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex2, ExecutionState.DEPLOYING, 15000L);
        }
    }

    @TestTemplate
    void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() throws Exception {
        JobGraph deserializeJobGraph = deserializeJobGraph(this.serializedJobGraph);
        JobVertex findVertexByID = deserializeJobGraph.findVertexByID(MIDDLE_ID);
        findVertexByID.addOperatorCoordinator(new SerializedValue(new TestingOperatorCoordinator.Provider(((OperatorIDPair) findVertexByID.getOperatorIDs().get(0)).getGeneratedOperatorID())));
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph, Duration.ZERO);
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SOURCE_ID);
        });
        runInMainThread(() -> {
            AdaptiveBatchSchedulerTest.transitionExecutionsState(createScheduler, ExecutionState.FINISHED, Collections.singletonList(getExecutionVertex(MIDDLE_ID, 0, createScheduler.getExecutionGraph()).getCurrentExecutionAttempt()), null);
        });
        List<ExecutionAttemptID> currentAttemptIds = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SOURCE_ID));
        List<ExecutionAttemptID> currentAttemptIds2 = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(MIDDLE_ID));
        waitUntilWriteExecutionVertexFinishedEventPersisted(6);
        runInMainThread(() -> {
            this.jobEventStore.stop(false);
        });
        registerPartitions(createScheduler, Collections.emptySet(), Collections.singleton(createScheduler.getExecutionJobVertex(SOURCE_ID).getTaskVertices()[0].getID()));
        AdaptiveBatchScheduler createScheduler2 = createScheduler(deserializeJobGraph);
        startSchedulingAndWaitRecoverFinish(createScheduler2);
        for (ExecutionVertex executionVertex : getExecutionVertices(SOURCE_ID, createScheduler2.getExecutionGraph())) {
            if (executionVertex.getParallelSubtaskIndex() == 0) {
                ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 15000L);
            } else {
                Assertions.assertThat(currentAttemptIds).contains(new ExecutionAttemptID[]{executionVertex.getCurrentExecutionAttempt().getAttemptId()});
                Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
                JobMasterPartitionTracker partitionTracker = createScheduler2.getExecutionGraph().getPartitionTracker();
                Stream stream = executionVertex.getProducedPartitions().keySet().stream();
                DefaultExecutionGraph executionGraph = createScheduler2.getExecutionGraph();
                Objects.requireNonNull(executionGraph);
                Iterator it = ((List) stream.map(executionGraph::createResultPartitionId).collect(Collectors.toList())).iterator();
                while (it.hasNext()) {
                    Assertions.assertThat(partitionTracker.isPartitionTracked((ResultPartitionID) it.next())).isTrue();
                }
            }
        }
        for (ExecutionVertex executionVertex2 : getExecutionVertices(MIDDLE_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds2).doesNotContain(new ExecutionAttemptID[]{executionVertex2.getCurrentExecutionAttempt().getAttemptId()});
            if (executionVertex2.getParallelSubtaskIndex() == 0) {
                Assertions.assertThat(executionVertex2.getExecutionState()).isEqualTo(this.isBlockingShuffle ? ExecutionState.CREATED : ExecutionState.DEPLOYING);
            } else {
                ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex2, ExecutionState.DEPLOYING, 15000L);
            }
        }
    }

    @TestTemplate
    void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartitionNotFound() throws Exception {
        JobGraph deserializeJobGraph = deserializeJobGraph(this.serializedJobGraph);
        JobVertex findVertexByID = deserializeJobGraph.findVertexByID(SOURCE_ID);
        findVertexByID.addOperatorCoordinator(new SerializedValue(new TestingOperatorCoordinator.Provider(((OperatorIDPair) findVertexByID.getOperatorIDs().get(0)).getGeneratedOperatorID())));
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph);
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SOURCE_ID);
        });
        List<ExecutionAttemptID> currentAttemptIds = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SOURCE_ID));
        waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        runInMainThread(() -> {
            this.jobEventStore.stop(false);
        });
        registerPartitions(createScheduler);
        AdaptiveBatchScheduler createScheduler2 = createScheduler(deserializeJobGraph);
        startSchedulingAndWaitRecoverFinish(createScheduler2);
        for (ExecutionVertex executionVertex : getExecutionVertices(SOURCE_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds).contains(new ExecutionAttemptID[]{executionVertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
            JobMasterPartitionTracker partitionTracker = createScheduler2.getExecutionGraph().getPartitionTracker();
            Stream stream = executionVertex.getProducedPartitions().keySet().stream();
            DefaultExecutionGraph executionGraph = createScheduler2.getExecutionGraph();
            Objects.requireNonNull(executionGraph);
            Iterator it = ((List) stream.map(executionGraph::createResultPartitionId).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                Assertions.assertThat(partitionTracker.isPartitionTracked((ResultPartitionID) it.next())).isTrue();
            }
        }
        Iterator<ExecutionVertex> it2 = getExecutionVertices(MIDDLE_ID, createScheduler2.getExecutionGraph()).iterator();
        while (it2.hasNext()) {
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(it2.next(), ExecutionState.DEPLOYING, 15000L);
        }
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.RUNNING, MIDDLE_ID);
        });
        triggerFailedByDataConsumptionException(createScheduler2, getExecutionVertex(MIDDLE_ID, 0, createScheduler2.getExecutionGraph()));
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(getExecutionVertex(SOURCE_ID, 0, createScheduler2.getExecutionGraph()), ExecutionState.DEPLOYING, 15000L);
        for (int i = 0; i < 5; i++) {
            Assertions.assertThat(getExecutionVertex(SOURCE_ID, i, createScheduler2.getExecutionGraph()).getExecutionState()).isNotEqualTo(ExecutionState.FINISHED);
        }
    }

    @TestTemplate
    void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph(this.serializedJobGraph));
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SOURCE_ID);
        });
        List<ExecutionAttemptID> currentAttemptIds = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SOURCE_ID));
        waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        runInMainThread(() -> {
            this.jobEventStore.stop(false);
        });
        registerPartitions(createScheduler, Collections.emptySet(), Collections.singleton(getExecutionVertex(SOURCE_ID, 0, createScheduler.getExecutionGraph()).getID()));
        AdaptiveBatchScheduler createScheduler2 = createScheduler(deserializeJobGraph(this.serializedJobGraph));
        startSchedulingAndWaitRecoverFinish(createScheduler2);
        List<ExecutionVertex> executionVertices = getExecutionVertices(SOURCE_ID, createScheduler2.getExecutionGraph());
        for (int i = 0; i < executionVertices.size(); i++) {
            ExecutionVertex executionVertex = executionVertices.get(i);
            if (i == 0) {
                Assertions.assertThat(currentAttemptIds).doesNotContain(new ExecutionAttemptID[]{executionVertex.getCurrentExecutionAttempt().getAttemptId()});
                ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 15000L);
            } else {
                Assertions.assertThat(currentAttemptIds).contains(new ExecutionAttemptID[]{executionVertex.getCurrentExecutionAttempt().getAttemptId()});
                Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
            }
        }
    }

    @TestTemplate
    void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception {
        JobGraph deserializeJobGraph = deserializeJobGraph(this.serializedJobGraph);
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph);
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SOURCE_ID);
        });
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, MIDDLE_ID);
        });
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SINK_ID);
        });
        List<ExecutionAttemptID> currentAttemptIds = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SOURCE_ID));
        List<ExecutionAttemptID> currentAttemptIds2 = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(MIDDLE_ID));
        List<ExecutionAttemptID> currentAttemptIds3 = getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SINK_ID));
        waitUntilWriteExecutionVertexFinishedEventPersisted(12);
        runInMainThread(() -> {
            this.jobEventStore.stop(false);
        });
        AdaptiveBatchScheduler createScheduler2 = createScheduler(deserializeJobGraph);
        startSchedulingAndWaitRecoverFinish(createScheduler2);
        for (ExecutionVertex executionVertex : getExecutionVertices(SOURCE_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds).contains(new ExecutionAttemptID[]{executionVertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
        }
        for (ExecutionVertex executionVertex2 : getExecutionVertices(MIDDLE_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds2).contains(new ExecutionAttemptID[]{executionVertex2.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat(executionVertex2.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
        }
        Assertions.assertThat(createScheduler2.getExecutionJobVertex(SINK_ID).getParallelism()).isEqualTo(2);
        for (ExecutionVertex executionVertex3 : getExecutionVertices(SINK_ID, createScheduler2.getExecutionGraph())) {
            Assertions.assertThat(currentAttemptIds3).contains(new ExecutionAttemptID[]{executionVertex3.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat(executionVertex3.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
        }
    }

    @TestTemplate
    void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph(this.serializedJobGraph));
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        runInMainThread(() -> {
            SourceCoordinator<?, ?> internalSourceCoordinator = getInternalSourceCoordinator(createScheduler.getExecutionGraph(), SOURCE_ID);
            assignSplitsForAllSubTask(internalSourceCoordinator, getCurrentAttemptIds(createScheduler.getExecutionJobVertex(SOURCE_ID)));
            checkUnassignedSplits(internalSourceCoordinator, 0);
        });
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler, ExecutionState.FINISHED, SOURCE_ID);
        });
        waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        runInMainThread(() -> {
            this.jobEventStore.stop(false);
        });
        registerPartitions(createScheduler);
        AdaptiveBatchScheduler createScheduler2 = createScheduler(deserializeJobGraph(this.serializedJobGraph));
        startSchedulingAndWaitRecoverFinish(createScheduler2);
        SourceCoordinator<?, ?> internalSourceCoordinator = getInternalSourceCoordinator(createScheduler2.getExecutionGraph(), SOURCE_ID);
        runInMainThread(() -> {
            checkUnassignedSplits(internalSourceCoordinator, 0);
        });
        ExecutionVertex executionVertex = getExecutionVertex(MIDDLE_ID, 0, createScheduler2.getExecutionGraph());
        triggerFailedByDataConsumptionException(createScheduler2, executionVertex);
        ExecutionState executionState = this.isBlockingShuffle ? ExecutionState.CREATED : ExecutionState.DEPLOYING;
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, executionState, 15000L);
        runInMainThread(() -> {
            checkUnassignedSplits(internalSourceCoordinator, 2);
        });
        runInMainThread(() -> {
            assignSplits(internalSourceCoordinator, getExecutionVertex(SOURCE_ID, 0, createScheduler2.getExecutionGraph()).getCurrentExecutionAttempt().getAttemptId());
            checkUnassignedSplits(internalSourceCoordinator, 0);
        });
        runInMainThread(() -> {
            transitionExecutionsState(createScheduler2, ExecutionState.FINISHED, SOURCE_ID);
        });
        ExecutionVertex executionVertex2 = getExecutionVertex(MIDDLE_ID, 1, createScheduler2.getExecutionGraph());
        triggerFailedByDataConsumptionException(createScheduler2, executionVertex2);
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex2, executionState, 15000L);
        runInMainThread(() -> {
            checkUnassignedSplits(internalSourceCoordinator, 2);
        });
    }

    @TestTemplate
    void testReplayEventFailed() throws Exception {
        JobEventStore jobEventStore = new JobEventStore() { // from class: org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryTest.1
            public void start() {
            }

            public void stop(boolean z) {
            }

            public void writeEvent(JobEvent jobEvent, boolean z) {
            }

            public JobEvent readEvent() throws Exception {
                throw new Exception();
            }

            public boolean isEmpty() {
                return false;
            }
        };
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        this.delayedExecutor = manuallyTriggeredScheduledExecutor;
        AdaptiveBatchScheduler createScheduler = createScheduler(deserializeJobGraph(this.serializedJobGraph), jobEventStore, ((Integer) BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue()).intValue(), (Duration) BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE.defaultValue());
        Objects.requireNonNull(createScheduler);
        runInMainThread(createScheduler::startScheduling);
        manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
        runInMainThread(() -> {
        });
        Assertions.assertThat(ExceptionUtils.findThrowableWithMessage(createScheduler.getExecutionGraph().getFailureCause(), "Recover failed from JM failover")).isPresent();
        for (ExecutionVertex executionVertex : getExecutionVertices(SOURCE_ID, createScheduler.getExecutionGraph())) {
            Assertions.assertThat(executionVertex.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(1);
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 15000L);
        }
    }

    private void waitUntilWriteExecutionVertexFinishedEventPersisted(int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(new ArrayList(this.persistedJobEventList).stream().filter(jobEvent -> {
                return jobEvent instanceof ExecutionVertexFinishedEvent;
            }).count() == ((long) i));
        });
    }

    private void triggerFailedByDataConsumptionException(SchedulerBase schedulerBase, ExecutionVertex executionVertex) {
        runInMainThread(() -> {
            AdaptiveBatchSchedulerTest.transitionExecutionsState(schedulerBase, ExecutionState.FAILED, Collections.singletonList(executionVertex.getCurrentExecutionAttempt()), new PartitionNotFoundException(schedulerBase.getExecutionGraph().createResultPartitionId(getConsumedResultPartitions(schedulerBase.getExecutionGraph().getSchedulingTopology(), executionVertex.getID()).get(0))));
        });
    }

    private void assignSplits(SourceCoordinator<?, ?> sourceCoordinator, ExecutionAttemptID executionAttemptID) {
        int subtaskIndex = executionAttemptID.getSubtaskIndex();
        int attemptNumber = executionAttemptID.getAttemptNumber();
        sourceCoordinator.executionAttemptReady(subtaskIndex, attemptNumber, this.receivingTasks.createGatewayForSubtask(subtaskIndex, attemptNumber));
        sourceCoordinator.handleEventFromOperator(subtaskIndex, attemptNumber, new ReaderRegistrationEvent(subtaskIndex, "location_" + subtaskIndex));
    }

    private void assignSplitsForAllSubTask(SourceCoordinator<?, ?> sourceCoordinator, List<ExecutionAttemptID> list) {
        list.forEach(executionAttemptID -> {
            assignSplits(sourceCoordinator, executionAttemptID);
        });
    }

    private void checkUnassignedSplits(SourceCoordinator<?, ?> sourceCoordinator, int i) {
        MockSplitEnumerator enumerator = sourceCoordinator.getEnumerator();
        runInCoordinatorThread(sourceCoordinator, () -> {
            Assertions.assertThat(enumerator.getUnassignedSplits()).hasSize(i);
        });
    }

    private void runInCoordinatorThread(SourceCoordinator<?, ?> sourceCoordinator, Runnable runnable) {
        try {
            sourceCoordinator.getCoordinatorExecutor().submit(runnable).get();
        } catch (Exception e) {
            org.junit.jupiter.api.Assertions.fail("Test failed due to " + e);
        }
    }

    private void runInMainThread(@Nonnull ThrowingRunnable<Throwable> throwingRunnable) {
        this.mainThreadExecutor.execute(throwingRunnable);
    }

    private void registerPartitions(AdaptiveBatchScheduler adaptiveBatchScheduler) {
        registerPartitions(adaptiveBatchScheduler, Collections.emptySet(), Collections.emptySet());
    }

    private void registerPartitions(AdaptiveBatchScheduler adaptiveBatchScheduler, Set<JobVertexID> set, Set<ExecutionVertexID> set2) {
        ExecutionGraph executionGraph = adaptiveBatchScheduler.getExecutionGraph();
        this.allPartitionWithMetrics.addAll((List) executionGraph.getAllIntermediateResults().values().stream().flatMap(intermediateResult -> {
            return Arrays.stream(intermediateResult.getPartitions());
        }).filter(intermediateResultPartition -> {
            ExecutionVertex producer = executionGraph.getResultPartitionOrThrow(intermediateResultPartition.getPartitionId()).getProducer();
            return (set.contains(producer.getJobvertexId()) || set2.contains(producer.getID()) || producer.getExecutionState() != ExecutionState.FINISHED) ? false : true;
        }).map(intermediateResultPartition2 -> {
            BlockingResultInfo blockingResultInfo = adaptiveBatchScheduler.getBlockingResultInfo(intermediateResultPartition2.getIntermediateResult().getId());
            IntermediateResultPartitionID partitionId = intermediateResultPartition2.getPartitionId();
            return new TestPartitionWithMetrics(new ResultPartitionID(partitionId, executionGraph.getResultPartitionOrThrow(partitionId).getProducer().getPartitionProducer().getAttemptId()), new DefaultShuffleMetrics(blockingResultInfo == null ? new ResultPartitionBytes(new long[0]) : new ResultPartitionBytes(new long[blockingResultInfo.getNumSubpartitions(0)])));
        }).collect(Collectors.toList()));
    }

    private void startSchedulingAndWaitRecoverFinish(AdaptiveBatchScheduler adaptiveBatchScheduler) throws Exception {
        Objects.requireNonNull(adaptiveBatchScheduler);
        runInMainThread(adaptiveBatchScheduler::startScheduling);
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(this.recoveryStarted.get() && !adaptiveBatchScheduler.isRecovering());
        });
    }

    private static SourceCoordinator<?, ?> getInternalSourceCoordinator(ExecutionGraph executionGraph, JobVertexID jobVertexID) throws Exception {
        return ((OperatorCoordinatorHolder) new ArrayList(executionGraph.getJobVertex(jobVertexID).getOperatorCoordinators()).get(0)).coordinator().getInternalCoordinator();
    }

    private static List<IntermediateResultPartitionID> getConsumedResultPartitions(SchedulingTopology schedulingTopology, ExecutionVertexID executionVertexID) {
        return (List) StreamSupport.stream(schedulingTopology.getVertex(executionVertexID).getConsumedResults().spliterator(), false).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
    }

    public static void transitionExecutionsState(SchedulerBase schedulerBase, ExecutionState executionState, JobVertexID jobVertexID) {
        AdaptiveBatchSchedulerTest.transitionExecutionsState(schedulerBase, executionState, schedulerBase.getExecutionJobVertex(jobVertexID).getJobVertex());
    }

    private JobGraph createDefaultJobGraph() throws IOException {
        ArrayList arrayList = new ArrayList();
        JobVertex jobVertex = new JobVertex("source", SOURCE_ID);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.addOperatorCoordinator(new SerializedValue(this.provider));
        jobVertex.setParallelism(5);
        arrayList.add(jobVertex);
        JobVertex jobVertex2 = new JobVertex("middle", MIDDLE_ID);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(5);
        arrayList.add(jobVertex2);
        JobVertex jobVertex3 = new JobVertex("sink", SINK_ID);
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        arrayList.add(jobVertex3);
        ResultPartitionType resultPartitionType = this.isBlockingShuffle ? ResultPartitionType.BLOCKING : ResultPartitionType.HYBRID_FULL;
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.POINTWISE, resultPartitionType);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex3, jobVertex2, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        return new JobGraph(JOB_ID, "TestJob", (JobVertex[]) arrayList.toArray(new JobVertex[0]));
    }

    private JobGraph createDefaultHybridJobGraph() throws IOException {
        ArrayList arrayList = new ArrayList();
        JobVertex jobVertex = new JobVertex("source", SOURCE_ID);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.addOperatorCoordinator(new SerializedValue(this.provider));
        jobVertex.setParallelism(5);
        arrayList.add(jobVertex);
        JobVertex jobVertex2 = new JobVertex("middle", MIDDLE_ID);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(5);
        arrayList.add(jobVertex2);
        JobVertex jobVertex3 = new JobVertex("sink", SINK_ID);
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        arrayList.add(jobVertex3);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex3, jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL);
        return new JobGraph(JOB_ID, "TestJob", (JobVertex[]) arrayList.toArray(new JobVertex[0]));
    }

    private static ExecutionVertex getExecutionVertex(JobVertexID jobVertexID, int i, ExecutionGraph executionGraph) {
        return getExecutionVertices(jobVertexID, executionGraph).get(i);
    }

    private static List<ExecutionVertex> getExecutionVertices(JobVertexID jobVertexID, ExecutionGraph executionGraph) {
        Preconditions.checkState(executionGraph.getJobVertex(jobVertexID).isInitialized());
        return Arrays.asList(executionGraph.getJobVertex(jobVertexID).getTaskVertices());
    }

    private static List<ExecutionAttemptID> getCurrentAttemptIds(ExecutionJobVertex executionJobVertex) {
        Preconditions.checkState(executionJobVertex.isInitialized());
        return (List) Arrays.stream(executionJobVertex.getTaskVertices()).map(executionVertex -> {
            return executionVertex.getCurrentExecutionAttempt().getAttemptId();
        }).collect(Collectors.toList());
    }

    private AdaptiveBatchScheduler createScheduler(JobGraph jobGraph) throws Exception {
        return createScheduler(jobGraph, this.jobEventStore, ((Integer) BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue()).intValue(), (Duration) BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE.defaultValue());
    }

    private AdaptiveBatchScheduler createScheduler(JobGraph jobGraph, Duration duration) throws Exception {
        return createScheduler(jobGraph, this.jobEventStore, ((Integer) BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue()).intValue(), duration);
    }

    private AdaptiveBatchScheduler createScheduler(JobGraph jobGraph, JobEventStore jobEventStore, int i, Duration duration) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE, duration);
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_ENABLED, true);
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, this.previousWorkerRecoveryTimeout);
        if (!this.isBlockingShuffle) {
            configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
            configuration.set(NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME, DummyTierFactory.class.getName());
        }
        ShuffleMaster<?> nettyShuffleMaster = new NettyShuffleMaster<>(new ShuffleMasterContextImpl(configuration, th -> {
        }));
        nettyShuffleMaster.registerJob(new JobShuffleContextImpl(jobGraph.getJobID(), new TestingJobMasterGatewayBuilder().setGetPartitionWithMetricsFunction((duration2, set) -> {
            return CompletableFuture.completedFuture(this.allPartitionWithMetrics);
        }).build()));
        return new DefaultSchedulerBuilder(jobGraph, this.mainThreadExecutor.getMainThreadExecutor(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(10, 0L).create()).setShuffleMaster(nettyShuffleMaster).setJobMasterConfiguration(configuration).setPartitionTracker(new JobMasterPartitionTrackerImpl(jobGraph.getJobID(), nettyShuffleMaster, resourceID -> {
            return Optional.empty();
        })).setDelayExecutor(this.delayedExecutor).setJobRecoveryHandler(new DefaultBatchJobRecoveryHandler(new JobEventManager(jobEventStore), configuration, jobGraph.getJobID())).setVertexParallelismAndInputInfosDecider(DefaultSchedulerBuilder.createCustomParallelismDecider(2)).setDefaultMaxParallelism(i).buildAdaptiveBatchJobScheduler(this.enableSpeculativeExecution);
    }

    private byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(jobGraph);
        return byteArrayOutputStream.toByteArray();
    }

    private JobGraph deserializeJobGraph(byte[] bArr) throws Exception {
        return (JobGraph) new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
    }
}
