/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SourceCoordinatorContextTest
extends SourceCoordinatorTestBase {
    @Test
    public void testRegisterReader() throws Exception {
        this.sourceReady();
        List<ReaderInfo> readerInfo = this.registerReaders();
        Assert.assertTrue((boolean)this.context.registeredReaders().containsKey(0));
        Assert.assertTrue((boolean)this.context.registeredReaders().containsKey(1));
        Assert.assertEquals((Object)readerInfo.get(0), this.context.registeredReaders().get(0));
        Assert.assertEquals((Object)readerInfo.get(1), this.context.registeredReaders().get(1));
        TestingSplitEnumerator<MockSourceSplit> enumerator = this.getEnumerator();
        Assert.assertThat(enumerator.getRegisteredReaders(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 1, 2}));
    }

    @Test
    public void testTaskFailureUnregistersReader() throws Exception {
        this.sourceReady();
        List<ReaderInfo> readerInfo = this.registerReaders();
        this.sourceCoordinator.subtaskFailed(0, null);
        this.waitForCoordinatorToProcessActions();
        Assert.assertEquals((String)"Only reader 2 should be registered.", (long)2L, (long)this.context.registeredReaders().size());
        Assert.assertNull(this.context.registeredReaders().get(0));
        Assert.assertEquals((Object)readerInfo.get(1), this.context.registeredReaders().get(1));
        Assert.assertEquals((Object)readerInfo.get(2), this.context.registeredReaders().get(2));
    }

    @Test
    public void testUnregisterUnregisteredReader() {
        this.context.unregisterSourceReader(0);
    }

    @Test
    public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
        this.testAssignSplits(true);
    }

    @Test
    public void testAssignSplitsFromOtherThread() throws Exception {
        this.testAssignSplits(false);
    }

    private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception {
        this.sourceReady();
        this.registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        if (fromCoordinatorExecutor) {
            this.coordinatorExecutor.submit(() -> this.context.assignSplits(splitsAssignment)).get();
        } else {
            this.context.assignSplits(splitsAssignment);
        }
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
        Assert.assertEquals((String)"There should be two events sent to the subtasks.", (long)2L, (long)this.receivingTasks.getNumberOfSentEvents());
        List<OperatorEvent> eventsToSubtask0 = this.receivingTasks.getSentEventsForSubtask(0);
        Assert.assertEquals((long)1L, (long)eventsToSubtask0.size());
        OperatorEvent event = eventsToSubtask0.get(0);
        Assert.assertTrue((boolean)(event instanceof AddSplitEvent));
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("0"), ((AddSplitEvent)event).splits((SimpleVersionedSerializer)new MockSourceSplitSerializer()));
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception {
        this.testAssignSplitToUnregisterdReader(true);
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception {
        this.testAssignSplitToUnregisterdReader(false);
    }

    private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor) throws Exception {
        this.sourceReady();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            if (fromCoordinatorExecutor) {
                this.coordinatorExecutor.submit(() -> this.context.assignSplits(splitsAssignment)).get();
            } else {
                this.context.assignSplits(splitsAssignment);
            }
        }), "assignSplits() should fail to assign the splits to a reader that is not registered.", "Cannot assign splits " + splitsAssignment.assignment().get(0));
    }

    @Test
    public void testExceptionInRunnableFailsTheJob() throws InterruptedException, ExecutionException {
        ManuallyTriggeredScheduledExecutorService manualWorkerExecutor = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService coordinatorExecutorWithExceptionHandler = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext testingContext = new SourceCoordinatorContext((ScheduledExecutorService)coordinatorExecutorWithExceptionHandler, (ScheduledExecutorService)manualWorkerExecutor, this.coordinatorThreadFactory, (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker);
        testingContext.runInCoordinatorThread(() -> {
            throw new RuntimeException();
        });
        manualWorkerExecutor.triggerAll();
        testingContext.close();
        coordinatorExecutorWithExceptionHandler.triggerAll();
        this.operatorCoordinatorContext.getJobFailedFuture().get();
        Assert.assertTrue((boolean)this.operatorCoordinatorContext.isJobFailed());
    }

    @Test
    public void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException {
        AtomicReference<Object> expectedError = new AtomicReference<Object>(null);
        ManuallyTriggeredScheduledExecutorService manualWorkerExecutor = new ManuallyTriggeredScheduledExecutorService();
        ManuallyTriggeredScheduledExecutorService manualCoordinatorExecutor = new ManuallyTriggeredScheduledExecutorService();
        SourceCoordinatorContext testingContext = new SourceCoordinatorContext((ScheduledExecutorService)manualCoordinatorExecutor, (ScheduledExecutorService)manualWorkerExecutor, new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString(), (OperatorCoordinator.Context)this.operatorCoordinatorContext), (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker);
        testingContext.callAsync(() -> {
            throw new InterruptedException();
        }, (ignored, e) -> {
            if (e != null) {
                expectedError.set(e);
                throw new RuntimeException((Throwable)e);
            }
        });
        manualWorkerExecutor.triggerAll();
        testingContext.close();
        manualCoordinatorExecutor.triggerAll();
        Assert.assertTrue((boolean)(expectedError.get() instanceof InterruptedException));
        Assert.assertFalse((boolean)this.operatorCoordinatorContext.isJobFailed());
    }

    private List<ReaderInfo> registerReaders() {
        List<ReaderInfo> infos = Arrays.asList(new ReaderInfo(0, "subtask_0_location"), new ReaderInfo(1, "subtask_1_location"), new ReaderInfo(2, "subtask_2_location"));
        for (ReaderInfo info : infos) {
            this.sourceCoordinator.handleEventFromOperator(info.getSubtaskId(), (OperatorEvent)new ReaderRegistrationEvent(info.getSubtaskId(), info.getLocation()));
        }
        this.waitForCoordinatorToProcessActions();
        return infos;
    }
}

