/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jobsubmission;

import java.io.IOException;
import java.nio.file.Paths;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.runners.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.sdk.expansion.service.ExpansionServer;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JobServerDriver
implements Runnable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(JobServerDriver.class);
    @VisibleForTesting
    public @UnknownKeyFor @NonNull @Initialized ServerConfiguration configuration;
    private final @UnknownKeyFor @NonNull @Initialized ServerFactory jobServerFactory;
    private final @UnknownKeyFor @NonNull @Initialized ServerFactory artifactServerFactory;
    private final @UnknownKeyFor @NonNull @Initialized JobInvokerFactory jobInvokerFactory;
    private volatile @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized InMemoryJobService> jobServer;
    private volatile @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ArtifactStagingService> artifactStagingServer;
    private volatile @UnknownKeyFor @NonNull @Initialized ExpansionServer expansionServer;

    protected @UnknownKeyFor @NonNull @Initialized InMemoryJobService createJobService() throws @UnknownKeyFor @NonNull @Initialized IOException {
        this.artifactStagingServer = this.createArtifactStagingService();
        this.expansionServer = this.createExpansionService();
        JobInvoker invoker = this.jobInvokerFactory.create();
        return InMemoryJobService.create(this.artifactStagingServer, this::createSessionToken, (ThrowingConsumer<Exception, String>)((ThrowingConsumer)stagingSessionToken -> {
            if (this.configuration.cleanArtifactsPerJob) {
                ((ArtifactStagingService)this.artifactStagingServer.getService()).removeStagedArtifacts(stagingSessionToken);
            }
        }), invoker, this.configuration.getMaxInvocationHistory());
    }

    protected static @UnknownKeyFor @NonNull @Initialized ServerFactory createJobServerFactory(@UnknownKeyFor @NonNull @Initialized ServerConfiguration configuration) {
        return ServerFactory.createWithPortSupplier(() -> configuration.port);
    }

    protected static @UnknownKeyFor @NonNull @Initialized ServerFactory createArtifactServerFactory(@UnknownKeyFor @NonNull @Initialized ServerConfiguration configuration) {
        return ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
    }

    protected JobServerDriver(@UnknownKeyFor @NonNull @Initialized ServerConfiguration configuration, @UnknownKeyFor @NonNull @Initialized ServerFactory jobServerFactory, @UnknownKeyFor @NonNull @Initialized ServerFactory artifactServerFactory, @UnknownKeyFor @NonNull @Initialized JobInvokerFactory jobInvokerFactory) {
        this.configuration = configuration;
        this.jobServerFactory = jobServerFactory;
        this.artifactServerFactory = artifactServerFactory;
        this.jobInvokerFactory = jobInvokerFactory;
    }

    public @UnknownKeyFor @NonNull @Initialized String getJobServerUrl() {
        return this.jobServer != null ? this.jobServer.getApiServiceDescriptor().getUrl() : null;
    }

    public @UnknownKeyFor @NonNull @Initialized String start() throws @UnknownKeyFor @NonNull @Initialized IOException {
        this.jobServer = this.createJobServer();
        return this.jobServer.getApiServiceDescriptor().getUrl();
    }

    @Override
    public void run() {
        try {
            this.jobServer = this.createJobServer();
            LOG.info("Job server now running, terminate with Ctrl+C");
            this.jobServer.getServer().awaitTermination();
        }
        catch (InterruptedException e) {
            LOG.warn("Job server interrupted", (Throwable)e);
        }
        catch (Exception e) {
            LOG.warn("Exception during job server creation", (Throwable)e);
        }
        finally {
            this.stop();
        }
    }

    public synchronized void stop() {
        if (this.jobServer != null) {
            try {
                this.jobServer.close();
                LOG.info("JobServer stopped on {}", (Object)this.jobServer.getApiServiceDescriptor().getUrl());
                this.jobServer = null;
            }
            catch (Exception e) {
                LOG.error("Error while closing the jobServer.", (Throwable)e);
            }
        }
        if (this.artifactStagingServer != null) {
            try {
                this.artifactStagingServer.close();
                LOG.info("ArtifactStagingServer stopped on {}", (Object)this.artifactStagingServer.getApiServiceDescriptor().getUrl());
                this.artifactStagingServer = null;
            }
            catch (Exception e) {
                LOG.error("Error while closing the artifactStagingServer.", (Throwable)e);
            }
        }
        if (this.expansionServer != null) {
            try {
                this.expansionServer.close();
                LOG.info("Expansion stopped on {}:{}", (Object)this.expansionServer.getHost(), (Object)this.expansionServer.getPort());
                this.expansionServer = null;
            }
            catch (Exception e) {
                LOG.error("Error while closing the Expansion Service.", (Throwable)e);
            }
        }
    }

    protected @UnknownKeyFor @NonNull @Initialized String createSessionToken(@UnknownKeyFor @NonNull @Initialized String session) {
        return session;
    }

    private @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized InMemoryJobService> createJobServer() throws @UnknownKeyFor @NonNull @Initialized IOException {
        GrpcFnServer jobServiceGrpcFnServer;
        InMemoryJobService service = this.createJobService();
        if (this.configuration.port == 0) {
            jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)this.jobServerFactory);
        } else {
            Endpoints.ApiServiceDescriptor descriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.port).build();
            jobServiceGrpcFnServer = GrpcFnServer.create((FnService)service, (Endpoints.ApiServiceDescriptor)descriptor, (ServerFactory)this.jobServerFactory);
        }
        LOG.info("JobService started on {}", (Object)jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
        return jobServiceGrpcFnServer;
    }

    private @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ArtifactStagingService> createArtifactStagingService() throws @UnknownKeyFor @NonNull @Initialized IOException {
        GrpcFnServer server;
        ArtifactStagingService service = new ArtifactStagingService(ArtifactStagingService.beamFilesystemArtifactDestinationProvider((String)this.configuration.artifactStagingPath));
        if (this.configuration.artifactPort == 0) {
            server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)this.artifactServerFactory);
        } else {
            Endpoints.ApiServiceDescriptor descriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.artifactPort).build();
            server = GrpcFnServer.create((FnService)service, (Endpoints.ApiServiceDescriptor)descriptor, (ServerFactory)this.artifactServerFactory);
        }
        LOG.info("ArtifactStagingService started on {}", (Object)server.getApiServiceDescriptor().getUrl());
        return server;
    }

    @Nullable
    private @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized ExpansionServer createExpansionService() throws @UnknownKeyFor @NonNull @Initialized IOException {
        if (this.configuration.expansionPort < 0) {
            return null;
        }
        ExpansionServer expansionServer = ExpansionServer.create((ExpansionService)new ExpansionService(), (String)this.configuration.host, (int)this.configuration.expansionPort);
        LOG.info("Java ExpansionService started on {}:{}", (Object)expansionServer.getHost(), (Object)expansionServer.getPort());
        return expansionServer;
    }

    public static class ServerConfiguration {
        @Option(name="--job-host", usage="The job server host name")
        private @UnknownKeyFor @NonNull @Initialized String host = "localhost";
        @Option(name="--job-port", usage="The job service port. 0 to use a dynamic port. (Default: 8099)")
        private @UnknownKeyFor @NonNull @Initialized int port = 8099;
        @Option(name="--artifact-port", usage="The artifact service port. 0 to use a dynamic port. (Default: 8098)")
        private @UnknownKeyFor @NonNull @Initialized int artifactPort = 8098;
        @Option(name="--expansion-port", usage="The Java expansion service port. 0 to use a dynamic port, negative number to disable expansion service creation (Default: 8097)")
        private @UnknownKeyFor @NonNull @Initialized int expansionPort = 8097;
        @Option(name="--artifacts-dir", usage="The location to store staged artifact files. If artifact staging is needed, this directory must be accessible by the execution engine's workers.")
        private @UnknownKeyFor @NonNull @Initialized String artifactStagingPath = Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
        @Option(name="--clean-artifacts-per-job", usage="When true, remove each job's staged artifacts when it completes", handler=ExplicitBooleanOptionHandler.class)
        private @UnknownKeyFor @NonNull @Initialized boolean cleanArtifactsPerJob = true;
        @Option(name="--history-size", usage="The maximum number of completed jobs to keep.")
        private @UnknownKeyFor @NonNull @Initialized int maxInvocationHistory = 10;

        public @UnknownKeyFor @NonNull @Initialized String getHost() {
            return this.host;
        }

        public @UnknownKeyFor @NonNull @Initialized int getPort() {
            return this.port;
        }

        public @UnknownKeyFor @NonNull @Initialized int getArtifactPort() {
            return this.artifactPort;
        }

        public @UnknownKeyFor @NonNull @Initialized int getExpansionPort() {
            return this.expansionPort;
        }

        public @UnknownKeyFor @NonNull @Initialized String getArtifactStagingPath() {
            return this.artifactStagingPath;
        }

        public @UnknownKeyFor @NonNull @Initialized boolean isCleanArtifactsPerJob() {
            return this.cleanArtifactsPerJob;
        }

        public @UnknownKeyFor @NonNull @Initialized int getMaxInvocationHistory() {
            return this.maxInvocationHistory;
        }
    }

    public static interface JobInvokerFactory {
        public @UnknownKeyFor @NonNull @Initialized JobInvoker create();
    }
}

