/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.util.FatalExitExceptionHandler;

public class SourceCoordinatorProvider<SplitT extends SourceSplit>
extends RecreateOnResetOperatorCoordinator.Provider {
    private static final long serialVersionUID = -1921681440009738462L;
    private final String operatorName;
    private final Source<?, SplitT, ?> source;
    private final int numWorkerThreads;

    public SourceCoordinatorProvider(String operatorName, OperatorID operatorID, Source<?, SplitT, ?> source, int numWorkerThreads) {
        super(operatorID);
        this.operatorName = operatorName;
        this.source = source;
        this.numWorkerThreads = numWorkerThreads;
    }

    @Override
    public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
        String coordinatorThreadName = "SourceCoordinator-" + this.operatorName;
        CoordinatorExecutorThreadFactory coordinatorThreadFactory = new CoordinatorExecutorThreadFactory(coordinatorThreadName, context.getUserCodeClassloader());
        ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
        SimpleVersionedSerializer splitSerializer = this.source.getSplitSerializer();
        SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext(coordinatorExecutor, coordinatorThreadFactory, this.numWorkerThreads, context, splitSerializer);
        return new SourceCoordinator(this.operatorName, coordinatorExecutor, this.source, sourceCoordinatorContext);
    }

    public static class CoordinatorExecutorThreadFactory
    implements ThreadFactory,
    Thread.UncaughtExceptionHandler {
        private final String coordinatorThreadName;
        private final ClassLoader cl;
        private final Thread.UncaughtExceptionHandler errorHandler;
        @Nullable
        private Thread t;
        @Nullable
        private volatile Throwable previousFailureReason;

        CoordinatorExecutorThreadFactory(String coordinatorThreadName, ClassLoader contextClassLoader) {
            this(coordinatorThreadName, contextClassLoader, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
        }

        @VisibleForTesting
        CoordinatorExecutorThreadFactory(String coordinatorThreadName, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler errorHandler) {
            this.coordinatorThreadName = coordinatorThreadName;
            this.cl = contextClassLoader;
            this.errorHandler = errorHandler;
        }

        @Override
        public synchronized Thread newThread(Runnable r) {
            if (this.t != null && this.t.isAlive()) {
                throw new Error("Source Coordinator Thread already exists. There should never be more than one thread driving the actions of a Source Coordinator. Existing Thread: " + this.t);
            }
            if (this.t != null && this.previousFailureReason != null) {
                throw new Error("The following fatal error has happened in a previously spawned Source Coordinator thread. No new thread can be spawned.", this.previousFailureReason);
            }
            this.t = new Thread(r, this.coordinatorThreadName);
            this.t.setContextClassLoader(this.cl);
            this.t.setUncaughtExceptionHandler(this);
            return this.t;
        }

        @Override
        public synchronized void uncaughtException(Thread t, Throwable e) {
            if (this.previousFailureReason == null) {
                this.previousFailureReason = e;
            }
            this.errorHandler.uncaughtException(t, e);
        }

        String getCoordinatorThreadName() {
            return this.coordinatorThreadName;
        }

        boolean isCurrentThreadCoordinatorThread() {
            return Thread.currentThread() == this.t;
        }
    }
}

