package org.apache.flink.client.program;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.dispatcher.ConfigurationNotAllowedMessage;
import org.apache.flink.shaded.guava32.com.google.common.collect.MapDifference;
import org.apache.flink.shaded.guava32.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/client/program/StreamContextEnvironment.class */
public class StreamContextEnvironment extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
    private final boolean suppressSysout;
    private final boolean enforceSingleJobExecution;
    private final Configuration clusterConfiguration;
    private int jobCounter;
    private final boolean programConfigEnabled;
    private final Collection<String> programConfigWildcards;

    public StreamContextEnvironment(PipelineExecutorServiceLoader pipelineExecutorServiceLoader, Configuration configuration, ClassLoader classLoader, boolean z, boolean z2) {
        this(pipelineExecutorServiceLoader, configuration, configuration, classLoader, z, z2, true, Collections.emptyList());
    }

    @Internal
    public StreamContextEnvironment(PipelineExecutorServiceLoader pipelineExecutorServiceLoader, Configuration configuration, Configuration configuration2, ClassLoader classLoader, boolean z, boolean z2, boolean z3, Collection<String> collection) {
        super(pipelineExecutorServiceLoader, configuration2, classLoader);
        this.suppressSysout = z2;
        this.enforceSingleJobExecution = z;
        this.clusterConfiguration = configuration;
        this.jobCounter = 0;
        this.programConfigEnabled = z3;
        this.programConfigWildcards = collection;
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        JobClient executeAsync = executeAsync(streamGraph);
        List<JobListener> jobListeners = getJobListeners();
        try {
            JobExecutionResult jobExecutionResult = getJobExecutionResult(executeAsync);
            jobListeners.forEach(jobListener -> {
                jobListener.onJobExecuted(jobExecutionResult, null);
            });
            return jobExecutionResult;
        } catch (Throwable th) {
            jobListeners.forEach(jobListener2 -> {
                jobListener2.onJobExecuted(null, ExceptionUtils.stripExecutionException(th));
            });
            ExceptionUtils.rethrowException(th);
            return null;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v19, types: [org.apache.flink.api.common.JobExecutionResult] */
    private JobExecutionResult getJobExecutionResult(JobClient jobClient) throws Exception {
        DetachedJobExecutionResult detachedJobExecutionResult;
        Preconditions.checkNotNull(jobClient);
        if (((Boolean) this.configuration.get(DeploymentOptions.ATTACHED)).booleanValue()) {
            CompletableFuture<JobExecutionResult> jobExecutionResult = jobClient.getJobExecutionResult();
            ScheduledExecutorService scheduledExecutorService = null;
            if (((Boolean) this.configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)).booleanValue()) {
                Thread addShutdownHook = ShutdownHookUtil.addShutdownHook(() -> {
                    jobClient.cancel().get(1L, TimeUnit.SECONDS);
                }, StreamContextEnvironment.class.getSimpleName(), LOG);
                jobExecutionResult.whenComplete((jobExecutionResult2, th) -> {
                    ShutdownHookUtil.removeShutdownHook(addShutdownHook, StreamContextEnvironment.class.getSimpleName(), LOG);
                });
                scheduledExecutorService = ClientUtils.reportHeartbeatPeriodically(jobClient, ((Duration) this.configuration.get(ClientOptions.CLIENT_HEARTBEAT_INTERVAL)).toMillis(), ((Duration) this.configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)).toMillis());
            }
            detachedJobExecutionResult = jobExecutionResult.get();
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
            if (!this.suppressSysout) {
                System.out.println(detachedJobExecutionResult);
            }
        } else {
            detachedJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
        }
        return detachedJobExecutionResult;
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotAllowedConfigurations();
        validateAllowedExecution();
        JobClient executeAsync = super.executeAsync(streamGraph);
        if (!this.suppressSysout) {
            System.out.println("Job has been submitted with JobID " + executeAsync.getJobID());
        }
        return executeAsync;
    }

    private void validateAllowedExecution() {
        if (this.enforceSingleJobExecution && this.jobCounter > 0) {
            throw new FlinkRuntimeException("Cannot have more than one execute() or executeAsync() call in a single environment.");
        }
        this.jobCounter++;
    }

    public static void setAsContext(PipelineExecutorServiceLoader pipelineExecutorServiceLoader, Configuration configuration, ClassLoader classLoader, boolean z, boolean z2) {
        initializeContextEnvironment(configuration2 -> {
            boolean booleanValue = ((Boolean) configuration.get(DeploymentOptions.PROGRAM_CONFIG_ENABLED)).booleanValue();
            List list = (List) configuration.get(DeploymentOptions.PROGRAM_CONFIG_WILDCARDS);
            Configuration configuration2 = new Configuration();
            configuration2.addAll(configuration);
            configuration2.addAll(configuration2);
            return new StreamContextEnvironment(pipelineExecutorServiceLoader, configuration, configuration2, classLoader, z, z2, booleanValue, list);
        });
    }

    public static void unsetAsContext() {
        resetContextEnvironment();
    }

    private void checkNotAllowedConfigurations() throws MutatedConfigurationException {
        Collection<String> collectNotAllowedConfigurations = collectNotAllowedConfigurations();
        if (!collectNotAllowedConfigurations.isEmpty()) {
            throw new MutatedConfigurationException(collectNotAllowedConfigurations);
        }
    }

    private Collection<String> collectNotAllowedConfigurations() {
        if (this.programConfigEnabled) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Configuration configuration = new Configuration(this.clusterConfiguration);
        removeProgramConfigWildcards(configuration);
        checkMainConfiguration(configuration, arrayList);
        return arrayList;
    }

    private void checkMainConfiguration(Configuration configuration, List<String> list) {
        Configuration configuration2 = new Configuration(this.configuration);
        removeProgramConfigWildcards(configuration2);
        MapDifference difference = Maps.difference(configuration.toMap(), configuration2.toMap());
        difference.entriesOnlyOnRight().forEach((str, str2) -> {
            list.add(ConfigurationNotAllowedMessage.ofConfigurationAdded(str, str2));
        });
        difference.entriesOnlyOnLeft().forEach((str3, str4) -> {
            list.add(ConfigurationNotAllowedMessage.ofConfigurationRemoved(str3, str4));
        });
        difference.entriesDiffering().forEach((str5, valueDifference) -> {
            list.add(ConfigurationNotAllowedMessage.ofConfigurationChanged(str5, valueDifference));
        });
    }

    private void checkConfigurationObject(Configuration configuration, Configuration configuration2, String str, List<String> list) {
        removeProgramConfigWildcards(configuration2);
        MapDifference difference = Maps.difference(configuration.toMap(), configuration2.toMap());
        difference.entriesOnlyOnRight().forEach((str2, str3) -> {
            list.add(ConfigurationNotAllowedMessage.ofConfigurationObjectAdded(str, str2, str3));
        });
        difference.entriesDiffering().forEach((str4, valueDifference) -> {
            list.add(ConfigurationNotAllowedMessage.ofConfigurationObjectChanged(str, str4, valueDifference));
        });
        difference.entriesOnlyOnLeft().forEach((str5, str6) -> {
            list.add(ConfigurationNotAllowedMessage.ofConfigurationObjectRemoved(str, str5, str6));
        });
    }

    private void removeProgramConfigWildcards(Configuration configuration) {
        Iterator<String> it = this.programConfigWildcards.iterator();
        while (it.hasNext()) {
            configuration.removeKey(it.next());
        }
    }
}
