/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SplitAssignmentTracker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

public abstract class SourceCoordinatorTestBase {
    protected static final String OPERATOR_NAME = "TestOperator";
    protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L);
    protected static final int NUM_SUBTASKS = 3;
    protected ExecutorService coordinatorExecutor;
    protected MockOperatorCoordinatorContext operatorCoordinatorContext;
    protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker;
    protected SourceCoordinatorContext<MockSourceSplit> context;
    protected SourceCoordinator<?, ?> sourceCoordinator;
    private MockSplitEnumerator enumerator;

    @Before
    public void setup() throws Exception {
        this.operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 3);
        this.splitSplitAssignmentTracker = new SplitAssignmentTracker();
        String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
        SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName, this.getClass().getClassLoader());
        this.coordinatorExecutor = Executors.newSingleThreadExecutor((ThreadFactory)coordinatorThreadFactory);
        this.context = new SourceCoordinatorContext(this.coordinatorExecutor, coordinatorThreadFactory, 1, (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker);
        this.sourceCoordinator = this.getNewSourceCoordinator();
    }

    @After
    public void cleanUp() throws InterruptedException, TimeoutException {
        this.coordinatorExecutor.shutdown();
        if (!this.coordinatorExecutor.awaitTermination(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException("Failed to close the CoordinatorExecutor before timeout.");
        }
    }

    protected MockSplitEnumerator getEnumerator() {
        if (this.enumerator == null) {
            this.enumerator = (MockSplitEnumerator)this.sourceCoordinator.getEnumerator();
            Assert.assertNotNull((String)"source was not started", (Object)this.enumerator);
        }
        return this.enumerator;
    }

    protected SourceCoordinator getNewSourceCoordinator() throws Exception {
        MockSource mockSource = new MockSource(Boundedness.BOUNDED, 6);
        return new SourceCoordinator(OPERATOR_NAME, this.coordinatorExecutor, (Source)mockSource, this.context);
    }
}

