package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
import org.apache.flink.util.TernaryBoolean;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.class */
class StreamExecutionEnvironmentComplexConfigurationTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$BasicJobExecutedCounter.class */
    public static class BasicJobExecutedCounter implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
            this.count++;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$BasicJobSubmittedCounter.class */
    public static class BasicJobSubmittedCounter implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
            this.count++;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo.class */
    public static class CustomPojo {
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer.class */
    public static class CustomPojoSerializer extends Serializer<CustomPojo> {
        public void write(Kryo kryo, Output output, CustomPojo customPojo) {
        }

        public CustomPojo read(Kryo kryo, Input input, Class<CustomPojo> cls) {
            return null;
        }

        /* renamed from: read, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m2read(Kryo kryo, Input input, Class cls) {
            return read(kryo, input, (Class<CustomPojo>) cls);
        }
    }

    StreamExecutionEnvironmentComplexConfigurationTest() {
    }

    @Test
    void testJobConfigFromEnvToExecutionGraph() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///valid");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, RestartStrategyOptions.RestartStrategyType.EXPONENTIAL_DELAY.getMainValue());
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        executionEnvironment.fromSequence(0L, 1L).addSink(new DiscardingSink());
        Configuration jobConfiguration = new DefaultSchedulerBuilder(executionEnvironment.getStreamGraph().getJobGraph(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), Executors.newSingleThreadScheduledExecutor()).build().getExecutionGraph().getJobConfiguration();
        Assertions.assertThat((String) jobConfiguration.get(StateBackendOptions.STATE_BACKEND)).isEqualTo((String) configuration.get(StateBackendOptions.STATE_BACKEND));
        Assertions.assertThat((String) jobConfiguration.get(CheckpointingOptions.CHECKPOINT_STORAGE)).isEqualTo((String) configuration.get(CheckpointingOptions.CHECKPOINT_STORAGE));
        Assertions.assertThat((String) jobConfiguration.get(RestartStrategyOptions.RESTART_STRATEGY)).isEqualTo((String) configuration.get(RestartStrategyOptions.RESTART_STRATEGY));
    }

    @Test
    void testLoadingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile("/tmp4", "file4", true);
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.cached-files", "name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2;name:file3,path:'oss://bucket/file1'");
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(executionEnvironment.getCachedFiles()).isEqualTo(Arrays.asList(Tuple2.of("file1", new DistributedCache.DistributedCacheEntry("/tmp1", true)), Tuple2.of("file2", new DistributedCache.DistributedCacheEntry("/tmp2", false)), Tuple2.of("file3", new DistributedCache.DistributedCacheEntry("oss://bucket/file1", false))));
    }

    @Test
    void testLoadingKryoSerializersFromConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.serialization-config", "{org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo: {type: kryo, kryo-type: default, class: org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer}}");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CustomPojo.class, CustomPojoSerializer.class);
        Assertions.assertThat(executionEnvironment.getConfig().getSerializerConfig().getDefaultKryoSerializerClasses()).isEqualTo(linkedHashMap);
    }

    @Test
    void testOverridingChangelogStateBackendWithFromConfigurationWhenSet() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Assertions.assertThat(TernaryBoolean.UNDEFINED).isEqualTo(executionEnvironment.isChangelogStateBackendEnabled());
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(TernaryBoolean.TRUE).isEqualTo(executionEnvironment.isChangelogStateBackendEnabled());
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(TernaryBoolean.TRUE).isEqualTo(executionEnvironment.isChangelogStateBackendEnabled());
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(TernaryBoolean.FALSE).isEqualTo(executionEnvironment.isChangelogStateBackendEnabled());
    }

    @Test
    void testNotOverridingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile("/tmp3", "file3", true);
        executionEnvironment.configure(new Configuration(), Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(executionEnvironment.getCachedFiles()).isEqualTo(Arrays.asList(Tuple2.of("file3", new DistributedCache.DistributedCacheEntry("/tmp3", true))));
    }

    @Test
    void testLoadingListenersFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        List asList = Arrays.asList(BasicJobSubmittedCounter.class, BasicJobExecutedCounter.class);
        Configuration configuration = new Configuration();
        ConfigUtils.encodeCollectionToConfig(configuration, DeploymentOptions.JOB_LISTENERS, asList, (v0) -> {
            return v0.getName();
        });
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat(executionEnvironment.getJobListeners().size()).isEqualTo(2);
        Assertions.assertThat((JobListener) executionEnvironment.getJobListeners().get(0)).isInstanceOf(BasicJobSubmittedCounter.class);
        Assertions.assertThat((JobListener) executionEnvironment.getJobListeners().get(1)).isInstanceOf(BasicJobExecutedCounter.class);
    }

    @Test
    void testGettingEnvironmentWithConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10);
        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100L));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        Assertions.assertThat(executionEnvironment.getParallelism()).isEqualTo(10);
        Assertions.assertThat(executionEnvironment.getConfig().getAutoWatermarkInterval()).isEqualTo(100L);
    }

    @Test
    void testLocalEnvironmentExplicitParallelism() {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10);
        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100L));
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
        Assertions.assertThat(createLocalEnvironment.getParallelism()).isEqualTo(2);
        Assertions.assertThat(createLocalEnvironment.getConfig().getAutoWatermarkInterval()).isEqualTo(100L);
    }
}
