/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.KvStateHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

abstract class StateWithExecutionGraph
implements State {
    private final Context context;
    private final ExecutionGraph executionGraph;
    private final ExecutionGraphHandler executionGraphHandler;
    private final OperatorCoordinatorHandler operatorCoordinatorHandler;
    private final KvStateHandler kvStateHandler;
    private final Logger logger;
    private final ClassLoader userCodeClassLoader;
    private final List<ExceptionHistoryEntry> failureCollection;
    private final VertexEndOfDataListener vertexEndOfDataListener;

    StateWithExecutionGraph(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, ClassLoader userClassCodeLoader, List<ExceptionHistoryEntry> failureCollection) {
        this.context = context;
        this.executionGraph = executionGraph;
        this.executionGraphHandler = executionGraphHandler;
        this.operatorCoordinatorHandler = operatorCoordinatorHandler;
        this.kvStateHandler = new KvStateHandler(executionGraph);
        this.logger = logger;
        this.userCodeClassLoader = userClassCodeLoader;
        this.failureCollection = new ArrayList<ExceptionHistoryEntry>(failureCollection);
        this.vertexEndOfDataListener = new VertexEndOfDataListener(executionGraph);
        FutureUtils.assertNoException((CompletableFuture)executionGraph.getTerminationFuture().thenAcceptAsync(jobStatus -> {
            if (jobStatus.isGloballyTerminalState()) {
                context.runIfState(this, () -> {
                    StateWithExecutionGraph.convertFailures(this.failureCollection).ifPresent(context::archiveFailure);
                    this.onGloballyTerminalState((JobStatus)jobStatus);
                });
            }
        }, context.getMainThreadExecutor()));
    }

    ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    JobID getJobId() {
        return this.executionGraph.getJobID();
    }

    protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
        return this.operatorCoordinatorHandler;
    }

    protected ExecutionGraphHandler getExecutionGraphHandler() {
        return this.executionGraphHandler;
    }

    @Override
    public void onLeave(Class<? extends State> newState) {
        if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
            this.operatorCoordinatorHandler.disposeAllOperatorCoordinators();
        }
    }

    @Override
    public ArchivedExecutionGraph getJob() {
        return ArchivedExecutionGraph.createFrom(this.executionGraph, this.getJobStatus());
    }

    @Override
    public void suspend(Throwable cause) {
        this.executionGraph.suspend(cause);
        Preconditions.checkState((boolean)this.executionGraph.getState().isTerminalState());
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(this.executionGraph));
    }

    @Override
    public Logger getLogger() {
        return this.logger;
    }

    SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
        return this.executionGraphHandler.requestNextInputSplit(vertexID, executionAttempt);
    }

    ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
        return this.executionGraphHandler.requestPartitionState(intermediateResultId, resultPartitionId);
    }

    void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        this.executionGraphHandler.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
    }

    void declineCheckpoint(DeclineCheckpoint decline) {
        this.executionGraphHandler.declineCheckpoint(decline);
    }

    void notifyEndOfData(ExecutionAttemptID executionAttemptID) {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = this.executionGraph.getCheckpointCoordinatorConfiguration();
        if (checkpointCoordinatorConfiguration != null && checkpointCoordinatorConfiguration.isCheckpointingEnabled() && checkpointCoordinatorConfiguration.isEnableCheckpointsAfterTasksFinish()) {
            this.vertexEndOfDataListener.recordTaskEndOfData(executionAttemptID);
            if (this.vertexEndOfDataListener.areAllTasksEndOfData()) {
                this.triggerCheckpoint(CheckpointType.CONFIGURED);
            }
        }
    }

    void reportCheckpointMetrics(ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics) {
        this.executionGraphHandler.reportCheckpointMetrics(executionAttemptID, checkpointId, checkpointMetrics);
    }

    void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.executionGraph.updateAccumulators(accumulatorSnapshot);
    }

    KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws FlinkJobNotFoundException, UnknownKvStateLocation {
        return this.kvStateHandler.requestKvStateLocation(jobId, registrationName);
    }

    void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
        this.kvStateHandler.notifyKvStateRegistered(jobId, jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
    }

    void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException {
        this.kvStateHandler.notifyKvStateUnregistered(jobId, jobVertexId, keyGroupRange, registrationName);
    }

    CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob, SavepointFormatType formatType) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(checkpointCoordinator, targetDirectory, this.getJobId(), this.logger);
        this.logger.info("Triggering {}savepoint for job {}.", (Object)(cancelJob ? "cancel-with-" : ""), (Object)this.executionGraph.getJobID());
        if (cancelJob) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        return ((CompletableFuture)checkpointCoordinator.triggerSavepoint(targetDirectory, formatType).thenApply(CompletedCheckpoint::getExternalPointer)).handleAsync((path, throwable) -> {
            if (throwable != null) {
                if (cancelJob && this.context.isState(this)) {
                    this.startCheckpointScheduler(checkpointCoordinator);
                }
                throw new CompletionException((Throwable)throwable);
            }
            if (cancelJob && this.context.isState(this)) {
                this.logger.info("Savepoint stored in {}. Now cancelling {}.", path, (Object)this.executionGraph.getJobID());
                this.cancel();
            }
            return path;
        }, this.context.getMainThreadExecutor());
    }

    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        JobID jobID = this.executionGraph.getJobID();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(String.format("Job %s is not a streaming job.", jobID));
        }
        this.logger.info("Triggering a checkpoint for job {}.", (Object)jobID);
        return checkpointCoordinator.triggerCheckpoint(checkpointType).handleAsync((path, throwable) -> {
            if (throwable != null) {
                throw new CompletionException((Throwable)throwable);
            }
            return path;
        }, this.context.getMainThreadExecutor());
    }

    private void startCheckpointScheduler(CheckpointCoordinator checkpointCoordinator) {
        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt) throws FlinkException {
        this.operatorCoordinatorHandler.deliverOperatorEventToCoordinator(taskExecutionId, operatorId, evt);
    }

    CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorId, CoordinationRequest request) throws FlinkException {
        return this.operatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(operatorId, request);
    }

    abstract void onFailure(Throwable var1);

    abstract void onGloballyTerminalState(JobStatus var1);

    @Override
    public void handleGlobalFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
        this.failureCollection.add(ExceptionHistoryEntry.createGlobal(cause, failureLabels));
        this.onFailure(cause);
    }

    boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition, CompletableFuture<Map<String, String>> failureLabels) {
        Optional<AccessExecution> maybeExecution = this.executionGraph.findExecution(taskExecutionStateTransition.getID());
        Optional<String> maybeTaskName = this.executionGraph.findVertexWithAttempt(taskExecutionStateTransition.getID());
        ExecutionState desiredState = taskExecutionStateTransition.getExecutionState();
        boolean successfulUpdate = this.getExecutionGraph().updateState(taskExecutionStateTransition);
        if (successfulUpdate && desiredState == ExecutionState.FAILED) {
            AccessExecution execution = maybeExecution.orElseThrow(NoSuchElementException::new);
            String taskName = maybeTaskName.orElseThrow(NoSuchElementException::new);
            ExecutionState currentState = execution.getState();
            if (currentState == desiredState) {
                this.failureCollection.add(ExceptionHistoryEntry.create(execution, taskName, failureLabels));
                this.onFailure(ErrorInfo.handleMissingThrowable(taskExecutionStateTransition.getError(this.userCodeClassLoader)));
            }
        }
        return successfulUpdate;
    }

    List<ExceptionHistoryEntry> getFailures() {
        return this.failureCollection;
    }

    private static Optional<RootExceptionHistoryEntry> convertFailures(List<ExceptionHistoryEntry> failureCollection) {
        if (failureCollection.isEmpty()) {
            return Optional.empty();
        }
        ExceptionHistoryEntry first = failureCollection.remove(0);
        return Optional.of(RootExceptionHistoryEntry.fromExceptionHistoryEntry(first, failureCollection));
    }

    static interface Context
    extends StateTransitions.ToFinished {
        public void runIfState(State var1, Runnable var2);

        public boolean isState(State var1);

        public Executor getMainThreadExecutor();

        public void archiveFailure(RootExceptionHistoryEntry var1);
    }
}

