/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class RemoteEnvironmentITCase
extends TestLogger {
    private static final int TM_SLOTS = 4;
    private static final int USER_DOP = 2;
    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(4).build());

    @Test
    public void testUserSpecificParallelism() throws Exception {
        Configuration config = new Configuration();
        config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
        URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddress();
        String hostname = restAddress.getHost();
        int port = restAddress.getPort();
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)hostname, (int)port, (Configuration)config, (String[])new String[0]);
        env.setParallelism(2);
        MapPartitionOperator result = env.createInput((InputFormat)new ParallelismDependentInputFormat()).rebalance().mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Integer, Integer>(){

            public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
                out.collect((Object)this.getRuntimeContext().getIndexOfThisSubtask());
            }
        });
        List resultCollection = result.collect();
        Assert.assertEquals((long)2L, (long)resultCollection.size());
    }

    private static class ParallelismDependentInputFormat
    extends GenericInputFormat<Integer> {
        private transient boolean emitted;

        private ParallelismDependentInputFormat() {
        }

        public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
            Assert.assertEquals((long)2L, (long)numSplits);
            return super.createInputSplits(numSplits);
        }

        public boolean reachedEnd() {
            return this.emitted;
        }

        public Integer nextRecord(Integer reuse) {
            if (this.emitted) {
                return null;
            }
            this.emitted = true;
            return 1;
        }
    }
}

