package org.apache.flink.runtime.source.coordinator;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.class */
class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
    SourceCoordinatorContextTest() {
    }

    @Test
    void testRegisterReader() throws Exception {
        sourceReady();
        List<ReaderInfo> registerReaders = registerReaders();
        Assertions.assertThat(this.context.registeredReaders()).containsKey(0);
        Assertions.assertThat(this.context.registeredReaders()).containsKey(1);
        Assertions.assertThat(this.context.registeredReaders().get(0)).isEqualTo(registerReaders.get(0));
        Assertions.assertThat(this.context.registeredReaders().get(1)).isEqualTo(registerReaders.get(1));
        Assertions.assertThat(getEnumerator().getRegisteredReaders()).containsExactlyInAnyOrder(new Integer[]{0, 1, 2});
    }

    @Test
    void testTaskFailureUnregistersReader() throws Exception {
        sourceReady();
        List<ReaderInfo> registerReaders = registerReaders();
        this.sourceCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.context.registeredReaders()).as("Only reader 2 should be registered.", new Object[0]).hasSize(2);
        Assertions.assertThat(this.context.registeredReaders().get(0)).isNull();
        Assertions.assertThat(this.context.registeredReaders().get(1)).isEqualTo(registerReaders.get(1));
        Assertions.assertThat(this.context.registeredReaders().get(2)).isEqualTo(registerReaders.get(2));
    }

    @Test
    void testUnregisterUnregisteredReader() {
        this.context.unregisterSourceReader(0, 0);
    }

    @Test
    void testAssignSplitsFromCoordinatorExecutor() throws Exception {
        testAssignSplits(true);
    }

    @Test
    void testAssignSplitsFromOtherThread() throws Exception {
        testAssignSplits(false);
    }

    private void testAssignSplits(boolean z) throws Exception {
        sourceReady();
        registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        if (z) {
            this.coordinatorExecutor.submit(() -> {
                this.context.assignSplits(splitsAssignment);
            }).get();
        } else {
            this.context.assignSplits(splitsAssignment);
        }
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection) this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
        Assertions.assertThat(this.receivingTasks.getNumberOfSentEvents()).as("There should be two events sent to the subtasks.", new Object[0]).isEqualTo(2);
        List<OperatorEvent> sentEventsForSubtask = this.receivingTasks.getSentEventsForSubtask(0);
        Assertions.assertThat(sentEventsForSubtask).hasSize(1);
        AddSplitEvent addSplitEvent = (OperatorEvent) sentEventsForSubtask.get(0);
        Assertions.assertThat(addSplitEvent).isInstanceOf(AddSplitEvent.class);
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), addSplitEvent.splits(new MockSourceSplitSerializer()));
    }

    @Test
    void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception {
        testAssignSplitToUnregisterdReader(true);
    }

    @Test
    void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception {
        testAssignSplitToUnregisterdReader(false);
    }

    private void testAssignSplitToUnregisterdReader(boolean z) throws Exception {
        sourceReady();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        CoordinatorTestUtils.verifyException(() -> {
            if (z) {
                this.coordinatorExecutor.submit(() -> {
                    this.context.assignSplits(splitsAssignment);
                }).get();
            } else {
                this.context.assignSplits(splitsAssignment);
            }
        }, "assignSplits() should fail to assign the splits to a reader that is not registered.", "Cannot assign splits " + splitsAssignment.assignment().get(0));
    }

    @Test
    void testExceptionInRunnableFailsTheJob() throws InterruptedException, ExecutionException {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService2 = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext(manuallyTriggeredScheduledExecutorService2, manuallyTriggeredScheduledExecutorService, this.coordinatorThreadFactory, this.operatorCoordinatorContext, new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker, false);
        sourceCoordinatorContext.runInCoordinatorThread(() -> {
            throw new RuntimeException();
        });
        manuallyTriggeredScheduledExecutorService.triggerAll();
        sourceCoordinatorContext.close();
        manuallyTriggeredScheduledExecutorService2.triggerAll();
        this.operatorCoordinatorContext.getJobFailedFuture().get();
        Assertions.assertThat(this.operatorCoordinatorContext.isJobFailed()).isTrue();
    }

    @Test
    void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference(null);
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService2 = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext(manuallyTriggeredScheduledExecutorService2, manuallyTriggeredScheduledExecutorService, new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString(), this.operatorCoordinatorContext), this.operatorCoordinatorContext, new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker, false);
        sourceCoordinatorContext.callAsync(() -> {
            throw new InterruptedException();
        }, (obj, th) -> {
            if (th != null) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        manuallyTriggeredScheduledExecutorService.triggerAll();
        sourceCoordinatorContext.close();
        manuallyTriggeredScheduledExecutorService2.triggerAll();
        Assertions.assertThat((Throwable) atomicReference.get()).isInstanceOf(InterruptedException.class);
        Assertions.assertThat(this.operatorCoordinatorContext.isJobFailed()).isFalse();
    }

    @Test
    void testSupportsIntermediateNoMoreSplits() throws Exception {
        sourceReady();
        registerReaders();
        this.context.assignSplits(CoordinatorTestUtils.getSplitsAssignment(2, 0));
        this.context.signalIntermediateNoMoreSplits(0);
        this.context.signalIntermediateNoMoreSplits(1);
        Assertions.assertThat(this.context.hasNoMoreSplits(0)).isFalse();
        Assertions.assertThat(this.context.hasNoMoreSplits(1)).isFalse();
        Assertions.assertThat(this.context.hasNoMoreSplits(2)).isFalse();
        this.context.signalNoMoreSplits(0);
        this.context.signalNoMoreSplits(1);
        Assertions.assertThat(this.context.hasNoMoreSplits(0)).isTrue();
        Assertions.assertThat(this.context.hasNoMoreSplits(1)).isTrue();
        Assertions.assertThat(this.context.hasNoMoreSplits(2)).isFalse();
    }

    private List<ReaderInfo> registerReaders() {
        List<ReaderInfo> asList = Arrays.asList(new ReaderInfo(0, "subtask_0_location"), new ReaderInfo(1, "subtask_1_location"), new ReaderInfo(2, "subtask_2_location"));
        for (ReaderInfo readerInfo : asList) {
            this.sourceCoordinator.handleEventFromOperator(readerInfo.getSubtaskId(), 0, new ReaderRegistrationEvent(readerInfo.getSubtaskId(), readerInfo.getLocation()));
        }
        waitForCoordinatorToProcessActions();
        return asList;
    }
}
