/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Test;

public class SourceCoordinatorTest
extends SourceCoordinatorTestBase {
    @Test
    public void testThrowExceptionWhenNotStarted() {
        String failureMessage = "Call should fail when source coordinator has not started yet.";
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.notifyCheckpointComplete(100L)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.handleEventFromOperator(0, null)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.subtaskFailed(0, null)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture())), failureMessage, "The coordinator has not started yet.");
    }

    @Test
    public void testRestCheckpointAfterCoordinatorStarted() throws Exception {
        this.sourceCoordinator.start();
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.resetToCheckpoint(0L, null)), "Reset to checkpoint should fail after the coordinator has started", "The coordinator can only be reset if it was not yet started");
    }

    @Test(timeout=10000L)
    public void testStart() throws Exception {
        this.sourceCoordinator.start();
        while (!this.getEnumerator().started()) {
            Thread.sleep(1L);
        }
    }

    @Test
    public void testClosed() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.close();
        Assert.assertTrue((boolean)this.getEnumerator().closed());
    }

    @Test
    public void testReaderRegistration() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent)new ReaderRegistrationEvent(0, "location_0"));
        this.check(() -> {
            Assert.assertEquals((String)"2 splits should have been assigned to reader 0", (long)4L, (long)this.getEnumerator().getUnassignedSplits().size());
            Assert.assertTrue((boolean)this.context.registeredReaders().containsKey(0));
            Assert.assertTrue((boolean)this.getEnumerator().getHandledSourceEvent().isEmpty());
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        });
    }

    @Test
    public void testHandleSourceEvent() throws Exception {
        this.sourceCoordinator.start();
        SourceEvent sourceEvent = new SourceEvent(){};
        this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent)new SourceEventWrapper(sourceEvent));
        this.check(() -> {
            Assert.assertEquals((long)1L, (long)this.getEnumerator().getHandledSourceEvent().size());
            Assert.assertEquals((Object)sourceEvent, this.getEnumerator().getHandledSourceEvent().get(0));
        });
    }

    @Test
    public void testCheckpointCoordinatorAndRestore() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent)new ReaderRegistrationEvent(0, "location_0"));
        CompletableFuture checkpointFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
        byte[] bytes = (byte[])checkpointFuture.get();
        SourceCoordinator restoredCoordinator = this.getNewSourceCoordinator();
        restoredCoordinator.resetToCheckpoint(100L, bytes);
        MockSplitEnumerator restoredEnumerator = (MockSplitEnumerator)restoredCoordinator.getEnumerator();
        SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
        Assert.assertEquals((String)"2 splits should have been assigned to reader 0", (long)4L, (long)restoredEnumerator.getUnassignedSplits().size());
        Assert.assertTrue((boolean)restoredEnumerator.getHandledSourceEvent().isEmpty());
        Assert.assertEquals((long)1L, (long)restoredContext.registeredReaders().size());
        Assert.assertTrue((boolean)restoredContext.registeredReaders().containsKey(0));
    }

    @Test
    public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent)new ReaderRegistrationEvent(0, "location_0"));
        CompletableFuture checkpointFuture1 = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, checkpointFuture1);
        checkpointFuture1.get();
        this.getEnumerator().addNewSplits(Collections.singletonList(new MockSourceSplit(6)));
        CompletableFuture checkpointFuture2 = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(101L, checkpointFuture2);
        checkpointFuture2.get();
        this.check(() -> {
            Assert.assertEquals((long)4L, (long)this.getEnumerator().getUnassignedSplits().size());
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), (Collection)((Map)this.splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L)).get(0));
            Assert.assertTrue((boolean)this.splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), (Collection)this.splitSplitAssignmentTracker.assignmentsByCheckpointId(100L).get(0));
            CoordinatorTestUtils.verifyAssignment(Arrays.asList("6"), (Collection)this.splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
            List<OperatorEvent> eventsToReader0 = this.operatorCoordinatorContext.getEventsToOperator().get(0);
            Assert.assertEquals((long)2L, (long)eventsToReader0.size());
            try {
                CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), ((AddSplitEvent)eventsToReader0.get(0)).splits((SimpleVersionedSerializer)new MockSourceSplitSerializer()));
                CoordinatorTestUtils.verifyAssignment(Arrays.asList("6"), ((AddSplitEvent)eventsToReader0.get(1)).splits((SimpleVersionedSerializer)new MockSourceSplitSerializer()));
            }
            catch (IOException e) {
                Assert.fail((String)"Failed to deserialize splits.");
            }
        });
        this.sourceCoordinator.subtaskFailed(0, null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        this.check(() -> {
            Assert.assertFalse((String)"Reader 0 should have been unregistered.", (boolean)this.context.registeredReaders().containsKey(0));
            for (Map assignment : this.splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) {
                Assert.assertFalse((String)"Assignment in uncompleted checkpoint should have been reverted.", (boolean)assignment.containsKey(0));
            }
            Assert.assertFalse((boolean)this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
            Assert.assertEquals((long)7L, (long)this.getEnumerator().getUnassignedSplits().size());
        });
    }

    @Test
    public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent)new ReaderRegistrationEvent(0, "location_0"));
        CompletableFuture checkpointFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
        checkpointFuture.get();
        this.sourceCoordinator.notifyCheckpointComplete(100L);
        CommonTestUtils.waitUtil(() -> !this.getEnumerator().getSuccessfulCheckpoints().isEmpty(), (Duration)Duration.ofMillis(1000L), (String)"The enumerator failed to process the successful checkpoint before times out.");
        Assert.assertEquals((long)100L, (long)((Long)this.getEnumerator().getSuccessfulCheckpoints().get(0)));
        this.sourceCoordinator.subtaskFailed(0, null);
        this.check(() -> {
            Assert.assertFalse((boolean)this.context.registeredReaders().containsKey(0));
            Assert.assertEquals((long)4L, (long)this.getEnumerator().getUnassignedSplits().size());
            Assert.assertFalse((boolean)this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
            Assert.assertTrue((boolean)this.splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
        });
    }

    @Test
    public void testFailJobWhenExceptionThrownFromStart() throws Exception {
        final RuntimeException failureReason = new RuntimeException("Artificial Exception");
        MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)new MockSplitEnumeratorContext(1)){

            public void start() {
                throw failureReason;
            }
        };
        SourceCoordinator coordinator = new SourceCoordinator("TestOperator", this.coordinatorExecutor, new EnumeratorCreatingSource(() -> SourceCoordinatorTest.lambda$testFailJobWhenExceptionThrownFromStart$11((SplitEnumerator)splitEnumerator)), this.context);
        coordinator.start();
        CommonTestUtils.waitUtil(() -> this.operatorCoordinatorContext.isJobFailed(), (Duration)Duration.ofSeconds(10L), (String)"The job should have failed due to the artificial exception.");
        Assert.assertEquals((Object)failureReason, (Object)this.operatorCoordinatorContext.getJobFailureReason());
    }

    @Test
    public void testErrorThrownFromSplitEnumerator() throws Exception {
        final Error error = new Error("Test Error");
        MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)new MockSplitEnumeratorContext(1)){

            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
                throw error;
            }
        };
        SourceCoordinator coordinator = new SourceCoordinator("TestOperator", this.coordinatorExecutor, new EnumeratorCreatingSource(() -> SourceCoordinatorTest.lambda$testErrorThrownFromSplitEnumerator$13((SplitEnumerator)splitEnumerator)), this.context);
        coordinator.start();
        coordinator.handleEventFromOperator(1, (OperatorEvent)new SourceEventWrapper(new SourceEvent(){}));
        CommonTestUtils.waitUtil(() -> this.operatorCoordinatorContext.isJobFailed(), (Duration)Duration.ofSeconds(10L), (String)"The job should have failed due to the artificial exception.");
        Assert.assertEquals((Object)error, (Object)this.operatorCoordinatorContext.getJobFailureReason());
    }

    @Test
    public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
        URLClassLoader testClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
        EnumeratorCreatingSource source = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("testOperator", context.getOperatorId(), source, 1);
        OperatorCoordinator coordinator = provider.getCoordinator((OperatorCoordinator.Context)context);
        coordinator.start();
        ClassLoaderTestEnumerator enumerator = (ClassLoaderTestEnumerator)source.createEnumeratorFuture.get();
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.constructorClassLoader);
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.threadClassLoader.get());
        coordinator.close();
    }

    @Test
    public void testUserClassLoaderWhenRestoringEnumerator() throws Exception {
        URLClassLoader testClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
        EnumeratorCreatingSource source = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("testOperator", context.getOperatorId(), source, 1);
        OperatorCoordinator coordinator = provider.getCoordinator((OperatorCoordinator.Context)context);
        coordinator.resetToCheckpoint(1L, SourceCoordinatorTest.createEmptyCheckpoint(1L));
        coordinator.start();
        ClassLoaderTestEnumerator enumerator = (ClassLoaderTestEnumerator)source.restoreEnumeratorFuture.get();
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.constructorClassLoader);
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.threadClassLoader.get());
        coordinator.close();
    }

    private void check(Runnable runnable) {
        try {
            this.coordinatorExecutor.submit(runnable).get();
        }
        catch (Exception e) {
            Assert.fail((String)("Test failed due to " + e));
        }
    }

    private static byte[] createEmptyCheckpoint(long checkpointId) throws Exception {
        try (SourceCoordinatorContext emptyContext = new SourceCoordinatorContext(Executors.newDirectExecutorService(), new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory("test", SourceCoordinatorProviderTest.class.getClassLoader()), 1, (OperatorCoordinator.Context)new MockOperatorCoordinatorContext(new OperatorID(), 0), (SimpleVersionedSerializer)new MockSourceSplitSerializer());){
            byte[] byArray = SourceCoordinator.writeCheckpointBytes((long)checkpointId, Collections.emptySet(), (SourceCoordinatorContext)emptyContext, (SimpleVersionedSerializer)new MockSplitEnumeratorCheckpointSerializer(), (SimpleVersionedSerializer)new MockSourceSplitSerializer());
            return byArray;
        }
    }

    private static /* synthetic */ SplitEnumerator lambda$testErrorThrownFromSplitEnumerator$13(SplitEnumerator splitEnumerator) {
        return splitEnumerator;
    }

    private static /* synthetic */ SplitEnumerator lambda$testFailJobWhenExceptionThrownFromStart$11(SplitEnumerator splitEnumerator) {
        return splitEnumerator;
    }

    private static final class EnumeratorCreatingSource<T, EnumT extends SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>>
    implements Source<T, MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<EnumT> createEnumeratorFuture = new CompletableFuture();
        final CompletableFuture<EnumT> restoreEnumeratorFuture = new CompletableFuture();
        private final Supplier<EnumT> enumeratorFactory;

        public EnumeratorCreatingSource(Supplier<EnumT> enumeratorFactory) {
            this.enumeratorFactory = enumeratorFactory;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<T, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            throw new UnsupportedOperationException();
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
            SplitEnumerator enumerator = (SplitEnumerator)this.enumeratorFactory.get();
            this.createEnumeratorFuture.complete(enumerator);
            return enumerator;
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext, Set<MockSourceSplit> checkpoint) {
            SplitEnumerator enumerator = (SplitEnumerator)this.enumeratorFactory.get();
            this.restoreEnumeratorFuture.complete(enumerator);
            return enumerator;
        }

        public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
            return new MockSourceSplitSerializer();
        }

        public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new MockSplitEnumeratorCheckpointSerializer();
        }
    }

    private static final class ClassLoaderTestEnumerator
    implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<ClassLoader> threadClassLoader = new CompletableFuture();
        final ClassLoader constructorClassLoader = Thread.currentThread().getContextClassLoader();

        public void start() {
            this.threadClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public void addReader(int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public Set<MockSourceSplit> snapshotState() throws Exception {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }
}

