/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;

class SourceCoordinatorContextTest
extends SourceCoordinatorTestBase {
    SourceCoordinatorContextTest() {
    }

    @Test
    void testRegisterReader() throws Exception {
        this.sourceReady();
        List<ReaderInfo> readerInfo = this.registerReaders();
        Assertions.assertThat((Map)this.context.registeredReaders()).containsKey((Object)0);
        Assertions.assertThat((Map)this.context.registeredReaders()).containsKey((Object)1);
        Assertions.assertThat((Object)((ReaderInfo)this.context.registeredReaders().get(0))).isEqualTo((Object)readerInfo.get(0));
        Assertions.assertThat((Object)((ReaderInfo)this.context.registeredReaders().get(1))).isEqualTo((Object)readerInfo.get(1));
        TestingSplitEnumerator<MockSourceSplit> enumerator = this.getEnumerator();
        Assertions.assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1, 2});
        ReaderInfo readerInfoOfSubtask1 = ReaderInfo.createReaderInfo((int)1, (String)"subtask_1_location", Collections.singletonList(new MockSourceSplit(1)));
        this.sourceCoordinator.subtaskReset(1, 1L);
        this.sourceCoordinator.handleEventFromOperator(1, 1, (OperatorEvent)ReaderRegistrationEvent.createReaderRegistrationEvent((int)readerInfoOfSubtask1.getSubtaskId(), (String)readerInfoOfSubtask1.getLocation(), (List)readerInfoOfSubtask1.getReportedSplitsOnRegistration(), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.waitForCoordinatorToProcessActions();
        Assertions.assertThat((Object)((ReaderInfo)this.context.registeredReaders().get(1))).isEqualTo((Object)readerInfoOfSubtask1);
    }

    @Test
    void testTaskFailureUnregistersReader() throws Exception {
        this.sourceReady();
        List<ReaderInfo> readerInfo = this.registerReaders();
        this.sourceCoordinator.executionAttemptFailed(0, 0, null);
        this.waitForCoordinatorToProcessActions();
        ((MapAssert)Assertions.assertThat((Map)this.context.registeredReaders()).as("Only reader 2 should be registered.", new Object[0])).hasSize(2);
        Assertions.assertThat((Object)((ReaderInfo)this.context.registeredReaders().get(0))).isNull();
        Assertions.assertThat((Object)((ReaderInfo)this.context.registeredReaders().get(1))).isEqualTo((Object)readerInfo.get(1));
        Assertions.assertThat((Object)((ReaderInfo)this.context.registeredReaders().get(2))).isEqualTo((Object)readerInfo.get(2));
    }

    @Test
    void testUnregisterUnregisteredReader() {
        this.context.unregisterSourceReader(0, 0);
    }

    @Test
    void testAssignSplitsFromCoordinatorExecutor() throws Exception {
        this.testAssignSplits(true);
    }

    @Test
    void testAssignSplitsFromOtherThread() throws Exception {
        this.testAssignSplits(false);
    }

    private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception {
        this.sourceReady();
        this.registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        if (fromCoordinatorExecutor) {
            this.context.submitTask(() -> this.context.assignSplits(splitsAssignment)).get();
        } else {
            this.context.assignSplits(splitsAssignment);
        }
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.receivingTasks.getNumberOfSentEvents()).as("There should be two events sent to the subtasks.", new Object[0])).isEqualTo(2);
        List<OperatorEvent> eventsToSubtask0 = this.receivingTasks.getSentEventsForSubtask(0);
        Assertions.assertThat(eventsToSubtask0).hasSize(1);
        OperatorEvent event = eventsToSubtask0.get(0);
        Assertions.assertThat((Object)event).isInstanceOf(AddSplitEvent.class);
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), ((AddSplitEvent)event).splits((SimpleVersionedSerializer)new MockSourceSplitSerializer()));
    }

    @Test
    void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception {
        this.testAssignSplitToUnregisterdReader(true);
    }

    @Test
    void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception {
        this.testAssignSplitToUnregisterdReader(false);
    }

    private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor) throws Exception {
        this.sourceReady();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            if (fromCoordinatorExecutor) {
                this.context.submitTask(() -> this.context.assignSplits(splitsAssignment)).get();
            } else {
                this.context.assignSplits(splitsAssignment);
            }
        }), "assignSplits() should fail to assign the splits to a reader that is not registered.", "Cannot assign splits " + splitsAssignment.assignment().get(0));
    }

    @Test
    void testExceptionInRunnableFailsTheJob() throws InterruptedException, ExecutionException {
        ManuallyTriggeredScheduledExecutorService manualWorkerExecutor = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService coordinatorExecutorWithExceptionHandler = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext testingContext = new SourceCoordinatorContext(new JobID(), (ScheduledExecutorService)coordinatorExecutorWithExceptionHandler, (ScheduledExecutorService)manualWorkerExecutor, new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(this.coordinatorThreadName, (OperatorCoordinator.Context)this.operatorCoordinatorContext), (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker, false);
        testingContext.runInCoordinatorThread(() -> {
            throw new RuntimeException();
        });
        manualWorkerExecutor.triggerAll();
        testingContext.close();
        coordinatorExecutorWithExceptionHandler.triggerAll();
        this.operatorCoordinatorContext.getJobFailedFuture().get();
        Assertions.assertThat((boolean)this.operatorCoordinatorContext.isJobFailed()).isTrue();
    }

    @Test
    void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException {
        AtomicReference<Object> expectedError = new AtomicReference<Object>(null);
        ManuallyTriggeredScheduledExecutorService manualWorkerExecutor = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService manualCoordinatorExecutor = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext testingContext = new SourceCoordinatorContext(new JobID(), (ScheduledExecutorService)manualCoordinatorExecutor, (ScheduledExecutorService)manualWorkerExecutor, new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString(), (OperatorCoordinator.Context)this.operatorCoordinatorContext), (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker, false);
        testingContext.callAsync(() -> {
            throw new InterruptedException();
        }, (ignored, e) -> {
            if (e != null) {
                expectedError.set(e);
                throw new RuntimeException((Throwable)e);
            }
        });
        manualWorkerExecutor.triggerAll();
        testingContext.close();
        manualCoordinatorExecutor.triggerAll();
        Assertions.assertThat((Throwable)expectedError.get()).isInstanceOf(InterruptedException.class);
        Assertions.assertThat((boolean)this.operatorCoordinatorContext.isJobFailed()).isFalse();
    }

    @Test
    void testSupportsIntermediateNoMoreSplits() throws Exception {
        this.sourceReady();
        this.registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        this.context.assignSplits(splitsAssignment);
        this.context.signalIntermediateNoMoreSplits(0);
        this.context.signalIntermediateNoMoreSplits(1);
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(0)).isFalse();
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(1)).isFalse();
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(2)).isFalse();
        this.context.signalNoMoreSplits(0);
        this.context.signalNoMoreSplits(1);
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(0)).isTrue();
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(1)).isTrue();
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(2)).isFalse();
    }

    private List<ReaderInfo> registerReaders() {
        List<ReaderInfo> infos = Arrays.asList(new ReaderInfo(0, "subtask_0_location"), new ReaderInfo(1, "subtask_1_location"), new ReaderInfo(2, "subtask_2_location"));
        for (ReaderInfo info : infos) {
            this.sourceCoordinator.handleEventFromOperator(info.getSubtaskId(), 0, (OperatorEvent)new ReaderRegistrationEvent(info.getSubtaskId(), info.getLocation()));
        }
        this.waitForCoordinatorToProcessActions();
        return infos;
    }

    @Test
    void testSetIsProcessingBacklog() throws Exception {
        List<OperatorEvent> events;
        int i;
        this.sourceReady();
        this.registerReader(0, 0);
        this.context.setIsProcessingBacklog(true);
        for (i = 0; i < this.context.currentParallelism(); ++i) {
            events = this.receivingTasks.getSentEventsForSubtask(i);
            Assertions.assertThat((Object)events.get(events.size() - 1)).isEqualTo((Object)new IsProcessingBacklogEvent(true));
        }
        this.registerReader(1, 0);
        this.context.setIsProcessingBacklog(false);
        this.registerReader(2, 0);
        for (i = 0; i < this.context.currentParallelism(); ++i) {
            events = this.receivingTasks.getSentEventsForSubtask(i);
            Assertions.assertThat((Object)events.get(events.size() - 1)).isEqualTo((Object)new IsProcessingBacklogEvent(false));
        }
    }
}

