/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.ExecutionSubtaskAccess;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventValve;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OperatorCoordinatorHolder
implements OperatorCoordinatorCheckpointContext,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinatorHolder.class);
    private final OperatorCoordinator coordinator;
    private final OperatorID operatorId;
    private final LazyInitializedCoordinatorContext context;
    private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
    private final OperatorEventValve eventValve;
    private final IncompleteFuturesTracker unconfirmedEvents;
    private final int operatorParallelism;
    private final int operatorMaxParallelism;
    private Consumer<Throwable> globalFailureHandler;
    private ComponentMainThreadExecutor mainThreadExecutor;

    private OperatorCoordinatorHolder(OperatorID operatorId, OperatorCoordinator coordinator, LazyInitializedCoordinatorContext context, SubtaskAccess.SubtaskAccessFactory taskAccesses, int operatorParallelism, int operatorMaxParallelism) {
        this.operatorId = (OperatorID)((Object)Preconditions.checkNotNull((Object)((Object)operatorId)));
        this.coordinator = (OperatorCoordinator)Preconditions.checkNotNull((Object)coordinator);
        this.context = (LazyInitializedCoordinatorContext)Preconditions.checkNotNull((Object)context);
        this.taskAccesses = (SubtaskAccess.SubtaskAccessFactory)Preconditions.checkNotNull((Object)taskAccesses);
        this.operatorParallelism = operatorParallelism;
        this.operatorMaxParallelism = operatorMaxParallelism;
        this.unconfirmedEvents = new IncompleteFuturesTracker();
        this.eventValve = new OperatorEventValve();
    }

    public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
        this.lazyInitialize(scheduler::handleGlobalFailure, mainThreadExecutor);
    }

    @VisibleForTesting
    void lazyInitialize(Consumer<Throwable> globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) {
        this.globalFailureHandler = globalFailureHandler;
        this.mainThreadExecutor = mainThreadExecutor;
        this.eventValve.setMainThreadExecutorForValidation(mainThreadExecutor);
        this.context.lazyInitialize(globalFailureHandler, mainThreadExecutor);
        this.setupAllSubtaskGateways();
    }

    public OperatorCoordinator coordinator() {
        return this.coordinator;
    }

    @Override
    public OperatorID operatorId() {
        return this.operatorId;
    }

    @Override
    public int maxParallelism() {
        return this.operatorMaxParallelism;
    }

    @Override
    public int currentParallelism() {
        return this.operatorParallelism;
    }

    public void start() throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        Preconditions.checkState((boolean)this.context.isInitialized(), (Object)"Coordinator Context is not yet initialized");
        this.coordinator.start();
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
        this.context.unInitialize();
    }

    public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.handleEventFromOperator(subtask, event);
    }

    public void subtaskFailed(int subtask, @Nullable Throwable reason) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.subtaskFailed(subtask, reason);
    }

    @Override
    public void subtaskReset(int subtask, long checkpointId) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.setupSubtaskGateway(subtask);
        this.coordinator.subtaskReset(subtask, checkpointId);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.mainThreadExecutor.execute(() -> this.checkpointCoordinatorInternal(checkpointId, result));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.mainThreadExecutor.execute(() -> this.coordinator.notifyCheckpointComplete(checkpointId));
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) {
        this.mainThreadExecutor.execute(() -> this.coordinator.notifyCheckpointAborted(checkpointId));
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        if (this.mainThreadExecutor != null) {
            this.mainThreadExecutor.assertRunningInMainThread();
        }
        this.eventValve.openValveAndUnmarkCheckpoint();
        this.context.resetFailed();
        if (this.mainThreadExecutor != null) {
            this.setupAllSubtaskGateways();
        }
        this.coordinator.resetToCheckpoint(checkpointId, checkpointData);
    }

    private void checkpointCoordinatorInternal(long checkpointId, CompletableFuture<byte[]> result) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<byte[]>();
        FutureUtils.assertNoException(coordinatorCheckpoint.handleAsync((success, failure) -> {
            if (failure != null) {
                result.completeExceptionally((Throwable)failure);
            } else if (this.eventValve.tryShutValve(checkpointId)) {
                this.completeCheckpointOnceEventsAreDone(checkpointId, result, (byte[])success);
            } else {
                result.completeExceptionally(new FlinkException("Cannot shut event valve"));
            }
            return null;
        }, (Executor)this.mainThreadExecutor));
        try {
            this.eventValve.markForCheckpoint(checkpointId);
            this.coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
            result.completeExceptionally(t);
            this.globalFailureHandler.accept(t);
        }
    }

    private void completeCheckpointOnceEventsAreDone(long checkpointId, CompletableFuture<byte[]> checkpointFuture, byte[] checkpointResult) {
        Collection<CompletableFuture<?>> pendingEvents = this.unconfirmedEvents.getCurrentIncompleteAndReset();
        if (pendingEvents.isEmpty()) {
            checkpointFuture.complete(checkpointResult);
            return;
        }
        LOG.info("Coordinator checkpoint {} for coordinator {} is awaiting {} pending events", new Object[]{checkpointId, this.operatorId, pendingEvents.size()});
        FutureUtils.ConjunctFuture<Void> conjunct = FutureUtils.waitForAll(pendingEvents);
        conjunct.whenComplete((success, failure) -> {
            if (failure == null) {
                checkpointFuture.complete(checkpointResult);
            } else {
                checkpointFuture.completeExceptionally(new FlinkException("Failing OperatorCoordinator checkpoint because some OperatorEvents before this checkpoint barrier were not received by the target tasks."));
            }
        });
    }

    @Override
    public void afterSourceBarrierInjection(long checkpointId) {
        this.mainThreadExecutor.execute(() -> this.eventValve.openValveAndUnmarkCheckpoint(checkpointId));
    }

    @Override
    public void abortCurrentTriggering() {
        this.mainThreadExecutor.execute(this.eventValve::openValveAndUnmarkCheckpoint);
    }

    private void setupAllSubtaskGateways() {
        for (int i = 0; i < this.operatorParallelism; ++i) {
            this.setupSubtaskGateway(i);
        }
    }

    private void setupSubtaskGateway(int subtask) {
        SubtaskAccess sta = this.taskAccesses.getAccessForSubtask(subtask);
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(sta, this.eventValve, this.mainThreadExecutor, this.unconfirmedEvents);
        FutureUtils.assertNoException(sta.hasSwitchedToRunning().thenAccept(ignored -> {
            this.mainThreadExecutor.assertRunningInMainThread();
            if (sta.isStillRunning()) {
                this.notifySubtaskReady(subtask, gateway);
            }
        }));
    }

    private void notifySubtaskReady(int subtask, OperatorCoordinator.SubtaskGateway gateway) {
        try {
            this.coordinator.subtaskReady(subtask, gateway);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
            this.globalFailureHandler.accept(new FlinkException("Error from OperatorCoordinator", t));
        }
    }

    public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader) throws Exception {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)classLoader);){
            OperatorCoordinator.Provider provider = (OperatorCoordinator.Provider)serializedProvider.deserializeValue(classLoader);
            OperatorID opId = provider.getOperatorId();
            ExecutionSubtaskAccess.ExecutionJobVertexSubtaskAccess taskAccesses = new ExecutionSubtaskAccess.ExecutionJobVertexSubtaskAccess(jobVertex, opId);
            OperatorCoordinatorHolder operatorCoordinatorHolder = OperatorCoordinatorHolder.create(opId, provider, jobVertex.getName(), jobVertex.getGraph().getUserClassLoader(), jobVertex.getParallelism(), jobVertex.getMaxParallelism(), taskAccesses);
            return operatorCoordinatorHolder;
        }
    }

    @VisibleForTesting
    static OperatorCoordinatorHolder create(OperatorID opId, OperatorCoordinator.Provider coordinatorProvider, String operatorName, ClassLoader userCodeClassLoader, int operatorParallelism, int operatorMaxParallelism, SubtaskAccess.SubtaskAccessFactory taskAccesses) throws Exception {
        LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, operatorName, userCodeClassLoader, operatorParallelism);
        OperatorCoordinator coordinator = coordinatorProvider.create(context);
        return new OperatorCoordinatorHolder(opId, coordinator, context, taskAccesses, operatorParallelism, operatorMaxParallelism);
    }

    private static final class LazyInitializedCoordinatorContext
    implements OperatorCoordinator.Context {
        private static final Logger LOG = LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class);
        private final OperatorID operatorId;
        private final String operatorName;
        private final ClassLoader userCodeClassLoader;
        private final int operatorParallelism;
        private Consumer<Throwable> globalFailureHandler;
        private Executor schedulerExecutor;
        private volatile boolean failed;

        public LazyInitializedCoordinatorContext(OperatorID operatorId, String operatorName, ClassLoader userCodeClassLoader, int operatorParallelism) {
            this.operatorId = (OperatorID)((Object)Preconditions.checkNotNull((Object)((Object)operatorId)));
            this.operatorName = (String)Preconditions.checkNotNull((Object)operatorName);
            this.userCodeClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeClassLoader);
            this.operatorParallelism = operatorParallelism;
        }

        void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
            this.globalFailureHandler = (Consumer)Preconditions.checkNotNull(globalFailureHandler);
            this.schedulerExecutor = (Executor)Preconditions.checkNotNull((Object)schedulerExecutor);
        }

        void unInitialize() {
            this.globalFailureHandler = null;
            this.schedulerExecutor = null;
        }

        boolean isInitialized() {
            return this.schedulerExecutor != null;
        }

        private void checkInitialized() {
            Preconditions.checkState((boolean)this.isInitialized(), (Object)"Context was not yet initialized");
        }

        void resetFailed() {
            this.failed = false;
        }

        @Override
        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        @Override
        public void failJob(Throwable cause) {
            this.checkInitialized();
            if (this.failed) {
                LOG.warn("Ignoring the request to fail job because the job is already failing. The ignored failure cause is", cause);
                return;
            }
            this.failed = true;
            FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" + this.operatorName + "' (operator " + (Object)((Object)this.operatorId) + ").", cause);
            this.schedulerExecutor.execute(() -> this.globalFailureHandler.accept(e));
        }

        @Override
        public int currentParallelism() {
            return this.operatorParallelism;
        }

        @Override
        public ClassLoader getUserCodeClassloader() {
            return this.userCodeClassLoader;
        }
    }
}

