package org.apache.flink.runtime.dispatcher;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.concurrent.FutureUtils;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/JobMasterTester.class */
public class JobMasterTester implements Closeable {
    private static final Duration TIMEOUT = Duration.ofMinutes(1);
    private final TestingRpcService rpcService;
    private final JobID jobId;
    private final JobMasterGateway jobMasterGateway;
    private final UnresolvedTaskManagerLocation taskManagerLocation = new LocalUnresolvedTaskManagerLocation();
    private final ConcurrentMap<ExecutionAttemptID, TaskDeploymentDescriptor> descriptors = new ConcurrentHashMap();
    private final CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = new CompletableFuture<>();
    private final ConcurrentMap<Long, CheckpointCompletionHandler> checkpoints = new ConcurrentHashMap();
    private final TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/JobMasterTester$CheckpointCompletionHandler.class */
    public static class CheckpointCompletionHandler {
        private final Map<ExecutionAttemptID, CompletableFuture<Void>> completedAttemptFutures;
        private final CompletableFuture<Void> completedFuture;

        public CheckpointCompletionHandler(List<TaskDeploymentDescriptor> list) {
            this.completedAttemptFutures = (Map) list.stream().map((v0) -> {
                return v0.getExecutionAttemptId();
            }).collect(Collectors.toMap(Function.identity(), executionAttemptID -> {
                return new CompletableFuture();
            }));
            this.completedFuture = FutureUtils.completeAll(this.completedAttemptFutures.values());
        }

        void completeAttempt(ExecutionAttemptID executionAttemptID) {
            this.completedAttemptFutures.get(executionAttemptID).complete(null);
        }

        CompletableFuture<Void> getCompletedFuture() {
            return this.completedFuture;
        }
    }

    private static TaskStateSnapshot createNonEmptyStateSnapshot(TaskInformation taskInformation) {
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(taskInformation.getJobVertexId()), OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("foobar", new byte[0]))).build());
        return taskStateSnapshot;
    }

    public JobMasterTester(TestingRpcService testingRpcService, JobID jobID, JobMasterGateway jobMasterGateway) {
        this.rpcService = testingRpcService;
        this.jobId = jobID;
        this.jobMasterGateway = jobMasterGateway;
    }

    public CompletableFuture<Acknowledge> transitionTo(List<TaskDeploymentDescriptor> list, ExecutionState executionState) {
        return FutureUtils.completeAll((List) list.stream().map((v0) -> {
            return v0.getExecutionAttemptId();
        }).map(executionAttemptID -> {
            return this.jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, executionState));
        }).collect(Collectors.toList())).thenApply(r2 -> {
            return Acknowledge.get();
        });
    }

    public CompletableFuture<List<TaskDeploymentDescriptor>> deployVertices(int i) {
        return this.jobMasterGateway.registerTaskManager(this.jobId, TaskManagerRegistrationInformation.create(this.taskExecutorGateway.getAddress(), this.taskManagerLocation, TestingUtils.zeroUUID()), TIMEOUT).thenCompose(registrationResponse -> {
            return offerSlots(i);
        }).thenCompose(collection -> {
            return this.descriptorsFuture;
        });
    }

    public CompletableFuture<Void> getCheckpointFuture(long j) {
        return this.descriptorsFuture.thenCompose(list -> {
            return this.checkpoints.computeIfAbsent(Long.valueOf(j), l -> {
                return new CheckpointCompletionHandler(list);
            }).getCompletedFuture();
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.rpcService.unregisterGateway(this.taskExecutorGateway.getAddress());
    }

    private TaskExecutorGateway createTaskExecutorGateway() {
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer(this::onSubmitTaskConsumer).setTriggerCheckpointFunction((v1, v2, v3, v4) -> {
            return onTriggerCheckpoint(v1, v2, v3, v4);
        }).setConfirmCheckpointFunction((v1, v2, v3) -> {
            return onConfirmCheckpoint(v1, v2, v3);
        }).createTestingTaskExecutorGateway();
        this.rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        return createTestingTaskExecutorGateway;
    }

    private CompletableFuture<TaskInformation> getTaskInformation(ExecutionAttemptID executionAttemptID) {
        return this.descriptorsFuture.thenApply(list -> {
            try {
                return ((TaskDeploymentDescriptor) list.stream().filter(taskDeploymentDescriptor -> {
                    return executionAttemptID.equals(taskDeploymentDescriptor.getExecutionAttemptId());
                }).findAny().orElseThrow(() -> {
                    return new IllegalStateException(String.format("Task descriptor for %s not found.", executionAttemptID));
                })).getTaskInformation();
            } catch (Exception e) {
                throw new IllegalStateException(String.format("Unable to deserialize task information of %s.", executionAttemptID));
            }
        });
    }

    private CompletableFuture<Acknowledge> onTriggerCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2, CheckpointOptions checkpointOptions) {
        return getTaskInformation(executionAttemptID).thenCompose(taskInformation -> {
            this.jobMasterGateway.acknowledgeCheckpoint(this.jobId, executionAttemptID, j, new CheckpointMetrics(), TaskStateSnapshot.serializeTaskStateSnapshot(createNonEmptyStateSnapshot(taskInformation)));
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
    }

    private CompletableFuture<Acknowledge> onConfirmCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2) {
        return getTaskInformation(executionAttemptID).thenCompose(taskInformation -> {
            return completeAttemptCheckpoint(j, executionAttemptID);
        });
    }

    private CompletableFuture<Acknowledge> onSubmitTaskConsumer(TaskDeploymentDescriptor taskDeploymentDescriptor, JobMasterId jobMasterId) {
        return this.jobMasterGateway.requestJob(TIMEOUT).thenCompose(executionGraphInfo -> {
            int size = Iterables.size(executionGraphInfo.getArchivedExecutionGraph().getAllExecutionVertices());
            this.descriptors.put(taskDeploymentDescriptor.getExecutionAttemptId(), taskDeploymentDescriptor);
            if (this.descriptors.size() == size) {
                this.descriptorsFuture.complete(new ArrayList(this.descriptors.values()));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
    }

    private CompletableFuture<Acknowledge> completeAttemptCheckpoint(long j, ExecutionAttemptID executionAttemptID) {
        return this.descriptorsFuture.thenAccept(list -> {
            this.checkpoints.computeIfAbsent(Long.valueOf(j), l -> {
                return new CheckpointCompletionHandler(list);
            }).completeAttempt(executionAttemptID);
        }).thenApply(r2 -> {
            return Acknowledge.get();
        });
    }

    private CompletableFuture<Collection<SlotOffer>> offerSlots(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY));
        }
        return this.jobMasterGateway.offerSlots(this.taskManagerLocation.getResourceID(), arrayList, TIMEOUT);
    }
}
