package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
import org.apache.flink.runtime.blob.TaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.Executors;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.class */
class TaskSubmissionTestEnvironment implements AutoCloseable {
    private final HeartbeatServices heartbeatServices;
    private final TestingRpcService testingRpcService;
    private final TaskExecutorBlobService taskExecutorBlobService;
    private final Time timeout;
    private final TestingFatalErrorHandler testingFatalErrorHandler;
    private final TimerService<AllocationID> timerService;
    private final TestingHighAvailabilityServices haServices;
    private final TemporaryFolder temporaryFolder;
    private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable;
    private final JobMasterId jobMasterId;
    private TestingTaskExecutor taskExecutor;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment$Builder.class */
    public static final class Builder {
        private JobID jobId;
        private int slotSize;
        private TestingJobMasterGateway jobMasterGateway;

        @Nullable
        private String metricQueryServiceAddress;
        private boolean mockShuffleEnvironment = true;
        private JobMasterId jobMasterId = JobMasterId.generate();
        private boolean localCommunication = true;
        private Configuration configuration = new Configuration();
        private Optional<ShuffleEnvironment<?, ?>> optionalShuffleEnvironment = Optional.empty();
        private ResourceID resourceID = ResourceID.generate();
        private List<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>> taskManagerActionListeners = new ArrayList();

        public Builder(JobID jobID) {
            this.jobId = jobID;
        }

        public Builder setMetricQueryServiceAddress(String str) {
            this.metricQueryServiceAddress = str;
            return this;
        }

        public Builder useRealNonMockShuffleEnvironment() {
            this.optionalShuffleEnvironment = Optional.empty();
            this.mockShuffleEnvironment = false;
            return this;
        }

        public Builder setShuffleEnvironment(ShuffleEnvironment<?, ?> shuffleEnvironment) {
            this.mockShuffleEnvironment = false;
            this.optionalShuffleEnvironment = Optional.of(shuffleEnvironment);
            return this;
        }

        public Builder setSlotSize(int i) {
            this.slotSize = i;
            return this;
        }

        public Builder setJobMasterId(JobMasterId jobMasterId) {
            this.jobMasterId = jobMasterId;
            return this;
        }

        public Builder setJobMasterGateway(TestingJobMasterGateway testingJobMasterGateway) {
            this.jobMasterGateway = testingJobMasterGateway;
            return this;
        }

        public Builder setLocalCommunication(boolean z) {
            this.localCommunication = z;
            return this;
        }

        public Builder setConfiguration(Configuration configuration) {
            this.configuration = configuration;
            return this;
        }

        public Builder addTaskManagerActionListener(ExecutionAttemptID executionAttemptID, ExecutionState executionState, CompletableFuture<Void> completableFuture) {
            this.taskManagerActionListeners.add(Tuple3.of(executionAttemptID, executionState, completableFuture));
            return this;
        }

        public Builder setResourceID(ResourceID resourceID) {
            this.resourceID = resourceID;
            return this;
        }

        public TaskSubmissionTestEnvironment build() throws Exception {
            TestingRpcService testingRpcService = new TestingRpcService();
            return new TaskSubmissionTestEnvironment(this.jobId, this.jobMasterId, this.slotSize, this.jobMasterGateway, this.configuration, this.taskManagerActionListeners, this.metricQueryServiceAddress, testingRpcService, this.optionalShuffleEnvironment.orElseGet(() -> {
                try {
                    return TaskSubmissionTestEnvironment.createShuffleEnvironment(this.resourceID, this.localCommunication, this.configuration, testingRpcService, this.mockShuffleEnvironment);
                } catch (Exception e) {
                    throw new FlinkRuntimeException("Failed to build TaskSubmissionTestEnvironment", e);
                }
            }));
        }
    }

    private TaskSubmissionTestEnvironment(JobID jobID, JobMasterId jobMasterId, int i, TestingJobMasterGateway testingJobMasterGateway, Configuration configuration, List<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>> list, @Nullable String str, TestingRpcService testingRpcService, ShuffleEnvironment<?, ?> shuffleEnvironment) throws Exception {
        TaskManagerActions taskManagerActions;
        this.heartbeatServices = new HeartbeatServices(1000L, 1000L);
        this.taskExecutorBlobService = NoOpTaskExecutorBlobService.INSTANCE;
        this.timeout = Time.milliseconds(10000L);
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.timerService = new DefaultTimerService(TestingUtils.defaultExecutor(), this.timeout.toMilliseconds());
        this.haServices = new TestingHighAvailabilityServices();
        this.haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
        this.haServices.setJobMasterLeaderRetriever(jobID, new SettableLeaderRetrievalService());
        this.temporaryFolder = new TemporaryFolder();
        this.temporaryFolder.create();
        this.jobMasterId = jobMasterId;
        TaskSlotTableImpl createTaskSlotTable = i > 0 ? TaskSlotUtils.createTaskSlotTable(i) : TestingTaskSlotTable.newBuilder().tryMarkSlotActiveReturns(true).addTaskReturns(true).closeAsyncReturns(CompletableFuture.completedFuture(null)).allocateSlotReturns(true).memoryManagerGetterReturns(null).build();
        TestingJobMasterGateway build = testingJobMasterGateway == null ? new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> {
            return jobMasterId;
        }).build() : testingJobMasterGateway;
        this.testingRpcService = testingRpcService;
        JobTable create = DefaultJobTable.create();
        this.taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setShuffleEnvironment(shuffleEnvironment).setTaskSlotTable(createTaskSlotTable).setJobTable(create).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, Reference.owned(new File[]{this.temporaryFolder.newFolder()}), Executors.directExecutor())).build(), str, configuration);
        this.taskExecutor.start();
        this.taskExecutor.waitUntilStarted();
        this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(createTaskSlotTable, this.taskExecutor.getMainThreadExecutableForTesting());
        if (list.size() == 0) {
            taskManagerActions = new NoOpTaskManagerActions();
        } else {
            TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(this.threadSafeTaskSlotTable, build);
            for (Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>> tuple3 : list) {
                testTaskManagerActions.addListener((ExecutionAttemptID) tuple3.f0, (ExecutionState) tuple3.f1, (CompletableFuture) tuple3.f2);
            }
            taskManagerActions = testTaskManagerActions;
        }
        registerJobMasterConnection(create, jobID, testingRpcService, build, taskManagerActions, this.timeout, this.taskExecutor.getMainThreadExecutableForTesting());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void registerJobMasterConnection(JobTable jobTable, JobID jobID, RpcService rpcService, JobMasterGateway jobMasterGateway, TaskManagerActions taskManagerActions, Time time, MainThreadExecutable mainThreadExecutable) {
        mainThreadExecutable.runAsync(() -> {
            jobTable.getOrCreateJob(jobID, () -> {
                return TestingJobServices.newBuilder().build();
            }).connect(ResourceID.generate(), jobMasterGateway, taskManagerActions, new TestCheckpointResponder(), new TestGlobalAggregateManager(), new RpcResultPartitionConsumableNotifier(jobMasterGateway, rpcService.getScheduledExecutor(), time), TestingPartitionProducerStateChecker.newBuilder().setPartitionProducerStateFunction((jobID2, intermediateDataSetID, resultPartitionID) -> {
                return CompletableFuture.completedFuture(ExecutionState.RUNNING);
            }).build());
        });
    }

    public TestingTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public TaskExecutorGateway getTaskExecutorGateway() {
        return this.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    }

    public TaskSlotTable<Task> getTaskSlotTable() {
        return this.threadSafeTaskSlotTable;
    }

    public JobMasterId getJobMasterId() {
        return this.jobMasterId;
    }

    public TestingFatalErrorHandler getTestingFatalErrorHandler() {
        return this.testingFatalErrorHandler;
    }

    @Nonnull
    private TestingTaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, @Nullable String str, Configuration configuration) throws IOException {
        Configuration configuration2 = new Configuration(configuration);
        return new TestingTaskExecutor(this.testingRpcService, TaskManagerConfiguration.fromConfiguration(configuration2, TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration2), InetAddress.getLoopbackAddress().getHostAddress(), TestFileUtils.createTempDir()), this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, this.heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), str, this.taskExecutorBlobService, this.testingFatalErrorHandler, new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ShuffleEnvironment<?, ?> createShuffleEnvironment(ResourceID resourceID, boolean z, Configuration configuration, RpcService rpcService, boolean z2) throws Exception {
        ShuffleEnvironment<?, ?> build;
        if (z2) {
            build = (ShuffleEnvironment) Mockito.mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS);
        } else {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getByName(rpcService.getAddress()), configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_PORT));
            build = new NettyShuffleEnvironmentBuilder().setTaskManagerLocation(resourceID).setPartitionRequestInitialBackoff(configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL)).setPartitionRequestMaxBackoff(configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX)).setNettyConfig(z ? null : new NettyConfig(inetSocketAddress.getAddress(), inetSocketAddress.getPort(), ConfigurationParserUtils.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration)).build();
            build.start();
        }
        return build;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.testingRpcService.stopService().join();
        this.timerService.stop();
        this.taskExecutorBlobService.close();
        this.temporaryFolder.delete();
        this.testingFatalErrorHandler.rethrowError();
    }
}
