package org.apache.flink.runtime.operators.coordination;

import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.ExecutionSubtaskAccess;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.class */
public class OperatorCoordinatorHolder implements OperatorCoordinatorCheckpointContext, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinatorHolder.class);
    private final OperatorCoordinator coordinator;
    private final OperatorID operatorId;
    private final LazyInitializedCoordinatorContext context;
    private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
    private final int operatorParallelism;
    private final int operatorMaxParallelism;
    private GlobalFailureHandler globalFailureHandler;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private final IncompleteFuturesTracker unconfirmedEvents = new IncompleteFuturesTracker();
    private final OperatorEventValve eventValve = new OperatorEventValve();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.class */
    public static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
        private static final Logger LOG = LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class);
        private final OperatorID operatorId;
        private final String operatorName;
        private final ClassLoader userCodeClassLoader;
        private final int operatorParallelism;
        private final CoordinatorStore coordinatorStore;
        private final boolean supportsConcurrentExecutionAttempts;
        private GlobalFailureHandler globalFailureHandler;
        private Executor schedulerExecutor;
        private volatile boolean failed;

        public LazyInitializedCoordinatorContext(OperatorID operatorID, String str, ClassLoader classLoader, int i, CoordinatorStore coordinatorStore, boolean z) {
            this.operatorId = (OperatorID) Preconditions.checkNotNull(operatorID);
            this.operatorName = (String) Preconditions.checkNotNull(str);
            this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
            this.operatorParallelism = i;
            this.coordinatorStore = (CoordinatorStore) Preconditions.checkNotNull(coordinatorStore);
            this.supportsConcurrentExecutionAttempts = z;
        }

        void lazyInitialize(GlobalFailureHandler globalFailureHandler, Executor executor) {
            this.globalFailureHandler = (GlobalFailureHandler) Preconditions.checkNotNull(globalFailureHandler);
            this.schedulerExecutor = (Executor) Preconditions.checkNotNull(executor);
        }

        void unInitialize() {
            this.globalFailureHandler = null;
            this.schedulerExecutor = null;
        }

        boolean isInitialized() {
            return this.schedulerExecutor != null;
        }

        private void checkInitialized() {
            Preconditions.checkState(isInitialized(), "Context was not yet initialized");
        }

        void resetFailed() {
            this.failed = false;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public void failJob(Throwable th) {
            checkInitialized();
            FlinkException flinkException = new FlinkException("Global failure triggered by OperatorCoordinator for '" + this.operatorName + "' (operator " + this.operatorId + ").", th);
            if (this.failed) {
                LOG.debug("Ignoring the request to fail job because the job is already failing. The ignored failure cause is", flinkException);
            } else {
                this.failed = true;
                this.schedulerExecutor.execute(() -> {
                    this.globalFailureHandler.handleGlobalFailure(flinkException);
                });
            }
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public int currentParallelism() {
            return this.operatorParallelism;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public ClassLoader getUserCodeClassloader() {
            return this.userCodeClassLoader;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public CoordinatorStore getCoordinatorStore() {
            return this.coordinatorStore;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public boolean isConcurrentExecutionAttemptsSupported() {
            return this.supportsConcurrentExecutionAttempts;
        }
    }

    private OperatorCoordinatorHolder(OperatorID operatorID, OperatorCoordinator operatorCoordinator, LazyInitializedCoordinatorContext lazyInitializedCoordinatorContext, SubtaskAccess.SubtaskAccessFactory subtaskAccessFactory, int i, int i2) {
        this.operatorId = (OperatorID) Preconditions.checkNotNull(operatorID);
        this.coordinator = (OperatorCoordinator) Preconditions.checkNotNull(operatorCoordinator);
        this.context = (LazyInitializedCoordinatorContext) Preconditions.checkNotNull(lazyInitializedCoordinatorContext);
        this.taskAccesses = (SubtaskAccess.SubtaskAccessFactory) Preconditions.checkNotNull(subtaskAccessFactory);
        this.operatorParallelism = i;
        this.operatorMaxParallelism = i2;
    }

    public void lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.globalFailureHandler = globalFailureHandler;
        this.mainThreadExecutor = componentMainThreadExecutor;
        this.eventValve.setMainThreadExecutorForValidation(componentMainThreadExecutor);
        this.context.lazyInitialize(globalFailureHandler, componentMainThreadExecutor);
        setupAllSubtaskGateways();
    }

    public OperatorCoordinator coordinator() {
        return this.coordinator;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorInfo
    public OperatorID operatorId() {
        return this.operatorId;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorInfo
    public int maxParallelism() {
        return this.operatorMaxParallelism;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorInfo
    public int currentParallelism() {
        return this.operatorParallelism;
    }

    public void start() throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        Preconditions.checkState(this.context.isInitialized(), "Coordinator Context is not yet initialized");
        this.coordinator.start();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.coordinator.close();
        this.context.unInitialize();
    }

    public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.handleEventFromOperator(i, i2, operatorEvent);
    }

    public void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.executionAttemptFailed(i, i2, th);
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void subtaskReset(int i, long j) {
        this.mainThreadExecutor.assertRunningInMainThread();
        setupSubtaskGateway(i);
        this.coordinator.subtaskReset(i, j);
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        this.mainThreadExecutor.execute(() -> {
            checkpointCoordinatorInternal(j, completableFuture);
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void notifyCheckpointComplete(long j) {
        this.mainThreadExecutor.execute(() -> {
            this.coordinator.notifyCheckpointComplete(j);
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void notifyCheckpointAborted(long j) {
        this.mainThreadExecutor.execute(() -> {
            this.coordinator.notifyCheckpointAborted(j);
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        if (this.mainThreadExecutor != null) {
            this.mainThreadExecutor.assertRunningInMainThread();
        }
        this.eventValve.openValveAndUnmarkCheckpoint();
        this.context.resetFailed();
        if (this.mainThreadExecutor != null) {
            setupAllSubtaskGateways();
        }
        this.coordinator.resetToCheckpoint(j, bArr);
    }

    private void checkpointCoordinatorInternal(long j, CompletableFuture<byte[]> completableFuture) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CompletableFuture<byte[]> completableFuture2 = new CompletableFuture<>();
        FutureUtils.assertNoException(completableFuture2.handleAsync((BiFunction<? super byte[], Throwable, ? extends U>) (bArr, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return null;
            }
            if (this.eventValve.tryShutValve(j)) {
                completeCheckpointOnceEventsAreDone(j, completableFuture, bArr);
                return null;
            }
            completableFuture.completeExceptionally(new FlinkException("Cannot shut event valve"));
            return null;
        }, (Executor) this.mainThreadExecutor));
        try {
            this.eventValve.markForCheckpoint(j);
            this.coordinator.checkpointCoordinator(j, completableFuture2);
        } catch (Throwable th2) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th2);
            completableFuture.completeExceptionally(th2);
            this.globalFailureHandler.handleGlobalFailure(th2);
        }
    }

    private void completeCheckpointOnceEventsAreDone(long j, CompletableFuture<byte[]> completableFuture, byte[] bArr) {
        Collection<CompletableFuture<?>> currentIncompleteAndReset = this.unconfirmedEvents.getCurrentIncompleteAndReset();
        if (currentIncompleteAndReset.isEmpty()) {
            completableFuture.complete(bArr);
        } else {
            LOG.info("Coordinator checkpoint {} for coordinator {} is awaiting {} pending events", new Object[]{Long.valueOf(j), this.operatorId, Integer.valueOf(currentIncompleteAndReset.size())});
            FutureUtils.waitForAll(currentIncompleteAndReset).whenComplete((obj, th) -> {
                if (th == null) {
                    completableFuture.complete(bArr);
                } else {
                    completableFuture.completeExceptionally(new FlinkException("Failing OperatorCoordinator checkpoint because some OperatorEvents before this checkpoint barrier were not received by the target tasks."));
                }
            });
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void afterSourceBarrierInjection(long j) {
        this.mainThreadExecutor.execute(() -> {
            this.eventValve.openValveAndUnmarkCheckpoint(j);
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void abortCurrentTriggering() {
        ComponentMainThreadExecutor componentMainThreadExecutor = this.mainThreadExecutor;
        OperatorEventValve operatorEventValve = this.eventValve;
        operatorEventValve.getClass();
        componentMainThreadExecutor.execute(operatorEventValve::openValveAndUnmarkCheckpoint);
    }

    private void setupAllSubtaskGateways() {
        for (int i = 0; i < this.operatorParallelism; i++) {
            setupSubtaskGateway(i);
        }
    }

    private void setupSubtaskGateway(int i) {
        Iterator<SubtaskAccess> it = this.taskAccesses.getAccessesForSubtask(i).iterator();
        while (it.hasNext()) {
            setupSubtaskGateway(it.next());
        }
    }

    public void setupSubtaskGatewayForAttempts(int i, Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            setupSubtaskGateway(this.taskAccesses.getAccessForAttempt(i, it.next().intValue()));
        }
    }

    private void setupSubtaskGateway(SubtaskAccess subtaskAccess) {
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl(subtaskAccess, this.eventValve, this.mainThreadExecutor, this.unconfirmedEvents);
        FutureUtils.assertNoException(subtaskAccess.hasSwitchedToRunning().thenAccept(obj -> {
            this.mainThreadExecutor.assertRunningInMainThread();
            if (subtaskAccess.isStillRunning()) {
                notifySubtaskReady(subtaskGatewayImpl);
            }
        }));
    }

    private void notifySubtaskReady(OperatorCoordinator.SubtaskGateway subtaskGateway) {
        try {
            this.coordinator.executionAttemptReady(subtaskGateway.getSubtask(), subtaskGateway.getExecution().getAttemptNumber(), subtaskGateway);
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
            this.globalFailureHandler.handleGlobalFailure(new FlinkException("Error from OperatorCoordinator", th));
        }
    }

    public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedValue, ExecutionJobVertex executionJobVertex, ClassLoader classLoader, CoordinatorStore coordinatorStore, boolean z) throws Exception {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(classLoader);
        Throwable th = null;
        try {
            try {
                OperatorCoordinator.Provider provider = (OperatorCoordinator.Provider) serializedValue.deserializeValue(classLoader);
                OperatorID operatorId = provider.getOperatorId();
                OperatorCoordinatorHolder create = create(operatorId, provider, coordinatorStore, executionJobVertex.getName(), executionJobVertex.getGraph().getUserClassLoader(), executionJobVertex.getParallelism(), executionJobVertex.getMaxParallelism(), new ExecutionSubtaskAccess.ExecutionJobVertexSubtaskAccess(executionJobVertex, operatorId), z);
                if (of != null) {
                    if (0 != 0) {
                        try {
                            of.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        of.close();
                    }
                }
                return create;
            } finally {
            }
        } catch (Throwable th3) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    of.close();
                }
            }
            throw th3;
        }
    }

    @VisibleForTesting
    static OperatorCoordinatorHolder create(OperatorID operatorID, OperatorCoordinator.Provider provider, CoordinatorStore coordinatorStore, String str, ClassLoader classLoader, int i, int i2, SubtaskAccess.SubtaskAccessFactory subtaskAccessFactory, boolean z) throws Exception {
        LazyInitializedCoordinatorContext lazyInitializedCoordinatorContext = new LazyInitializedCoordinatorContext(operatorID, str, classLoader, i, coordinatorStore, z);
        return new OperatorCoordinatorHolder(operatorID, provider.create(lazyInitializedCoordinatorContext), lazyInitializedCoordinatorContext, subtaskAccessFactory, i, i2);
    }
}
