/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils;
import org.apache.flink.runtime.source.coordinator.SplitAssignmentTracker;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Internal
public class SourceCoordinatorContext<SplitT extends SourceSplit>
implements SplitEnumeratorContext<SplitT>,
AutoCloseable {
    private final ExecutorService coordinatorExecutor;
    private final ExecutorNotifier notifier;
    private final OperatorCoordinator.Context operatorCoordinatorContext;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
    private final SplitAssignmentTracker<SplitT> assignmentTracker;
    private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private final String coordinatorThreadName;

    public SourceCoordinatorContext(ExecutorService coordinatorExecutor, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, int numWorkerThreads, OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer<SplitT> splitSerializser) {
        this(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, operatorCoordinatorContext, splitSerializser, new SplitAssignmentTracker());
    }

    SourceCoordinatorContext(ExecutorService coordinatorExecutor, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, int numWorkerThreads, OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer<SplitT> splitSerializer, SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
        this.coordinatorExecutor = coordinatorExecutor;
        this.coordinatorThreadFactory = coordinatorThreadFactory;
        this.operatorCoordinatorContext = operatorCoordinatorContext;
        this.splitSerializer = splitSerializer;
        this.registeredReaders = new ConcurrentHashMap<Integer, ReaderInfo>();
        this.assignmentTracker = splitAssignmentTracker;
        this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
        this.notifier = new ExecutorNotifier(Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory(){
            private int index = 0;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, SourceCoordinatorContext.this.coordinatorThreadName + "-worker-" + this.index++);
            }
        }), coordinatorExecutor);
    }

    public MetricGroup metricGroup() {
        return null;
    }

    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
        this.callInCoordinatorThread(() -> {
            try {
                this.operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
                return null;
            }
            catch (TaskNotRunningException e) {
                throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d", event, subtaskId), (Throwable)((Object)e));
            }
        }, String.format("Failed to send event %s to subtask %d", event, subtaskId));
    }

    public int currentParallelism() {
        return this.operatorCoordinatorContext.currentParallelism();
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return Collections.unmodifiableMap(this.registeredReaders);
    }

    public void assignSplits(SplitsAssignment<SplitT> assignment) {
        this.callInCoordinatorThread(() -> {
            for (Integer subtaskId : assignment.assignment().keySet()) {
                if (this.registeredReaders.containsKey(subtaskId)) continue;
                throw new IllegalArgumentException(String.format("Cannot assign splits %s to subtask %d because the subtask is not registered.", this.registeredReaders.get(subtaskId), subtaskId));
            }
            this.assignmentTracker.recordSplitAssignment(assignment);
            assignment.assignment().forEach((id, splits) -> {
                try {
                    this.operatorCoordinatorContext.sendEvent(new AddSplitEvent<SplitT>(splits, this.splitSerializer), (int)id);
                }
                catch (TaskNotRunningException e) {
                    throw new FlinkRuntimeException(String.format("Failed to assign splits %s to reader %d.", splits, id), (Throwable)((Object)e));
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException("Failed to serialize splits.", (Throwable)e);
                }
            });
            return null;
        }, String.format("Failed to assign splits %s due to ", assignment));
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) {
        this.notifier.notifyReadyAsync(callable, handler, initialDelay, period);
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
        this.notifier.notifyReadyAsync(callable, handler);
    }

    @Override
    public void close() throws InterruptedException {
        this.notifier.close();
        this.coordinatorExecutor.shutdown();
        this.coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    void failJob(Throwable cause) {
        this.operatorCoordinatorContext.failJob(cause);
    }

    void snapshotState(long checkpointId, SimpleVersionedSerializer<SplitT> splitSerializer, DataOutputStream out) throws Exception {
        SourceCoordinatorSerdeUtils.writeRegisteredReaders(this.registeredReaders, out);
        this.assignmentTracker.snapshotState(checkpointId, splitSerializer, out);
    }

    void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, DataInputStream in) throws Exception {
        Map<Integer, ReaderInfo> readers = SourceCoordinatorSerdeUtils.readRegisteredReaders(in);
        this.registeredReaders.clear();
        this.registeredReaders.putAll(readers);
        this.assignmentTracker.restoreState(splitSerializer, in);
    }

    void registerSourceReader(ReaderInfo readerInfo) {
        this.registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
    }

    void unregisterSourceReader(int subtaskId) {
        Preconditions.checkNotNull(this.registeredReaders.remove(subtaskId), (String)String.format("Failed to unregister source reader of id %s because it is not registered.", subtaskId));
    }

    List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
        return this.assignmentTracker.getAndRemoveUncheckpointedAssignment(failedSubtaskId);
    }

    void onCheckpointComplete(long checkpointId) {
        this.assignmentTracker.onCheckpointComplete(checkpointId);
    }

    private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {
        if (!this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
            try {
                return this.coordinatorExecutor.submit(callable).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new FlinkRuntimeException(errorMessage, (Throwable)e);
            }
        }
        try {
            return callable.call();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException(errorMessage, (Throwable)e);
        }
    }
}

