package org.apache.flink.streaming.api.datastream;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.util.Preconditions;

@Experimental
/* loaded from: input_file:org/apache/flink/streaming/api/datastream/DataStreamUtils.class */
public final class DataStreamUtils {

    /* loaded from: input_file:org/apache/flink/streaming/api/datastream/DataStreamUtils$ClientAndIterator.class */
    public static final class ClientAndIterator<E> {
        public final JobClient client;
        public final Iterator<E> iterator;

        ClientAndIterator(JobClient jobClient, Iterator<E> it) {
            this.client = (JobClient) Preconditions.checkNotNull(jobClient);
            this.iterator = (Iterator) Preconditions.checkNotNull(it);
        }
    }

    public static <OUT> Iterator<OUT> collect(DataStream<OUT> dataStream) {
        return collect(dataStream, "Data Stream Collect");
    }

    public static <OUT> Iterator<OUT> collect(DataStream<OUT> dataStream, String str) {
        try {
            return (Iterator<OUT>) collectWithClient(dataStream, str).iterator;
        } catch (Exception e) {
            throw new RuntimeException("Failed to execute data stream", e);
        }
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [org.apache.flink.streaming.api.operators.collect.CollectResultIterator, java.util.Iterator] */
    public static <OUT> ClientAndIterator<OUT> collectWithClient(DataStream<OUT> dataStream, String str) throws Exception {
        TypeSerializer createSerializer = dataStream.getType().createSerializer(dataStream.getExecutionEnvironment().getConfig());
        String str2 = "dataStreamCollect_" + UUID.randomUUID().toString();
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        CollectSinkOperatorFactory collectSinkOperatorFactory = new CollectSinkOperatorFactory(createSerializer, str2);
        ?? collectResultIterator = new CollectResultIterator(((CollectSinkOperator) collectSinkOperatorFactory.getOperator()).getOperatorIdFuture(), createSerializer, str2);
        CollectStreamSink collectStreamSink = new CollectStreamSink(dataStream, collectSinkOperatorFactory);
        collectStreamSink.name("Data stream collect sink");
        executionEnvironment.addOperator(collectStreamSink.getTransformation());
        JobClient executeAsync = executionEnvironment.executeAsync(str);
        collectResultIterator.setJobClient(executeAsync);
        return new ClientAndIterator<>(executeAsync, collectResultIterator);
    }

    public static <E> List<E> collectBoundedStream(DataStream<E> dataStream, String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<E> it = collectWithClient(dataStream, str).iterator;
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        arrayList.trimToSize();
        return arrayList;
    }

    public static <E> List<E> collectUnboundedStream(DataStream<E> dataStream, int i, String str) throws Exception {
        ClientAndIterator collectWithClient = collectWithClient(dataStream, str);
        List<E> collectRecordsFromUnboundedStream = collectRecordsFromUnboundedStream(collectWithClient, i);
        collectWithClient.client.cancel().get();
        return collectRecordsFromUnboundedStream;
    }

    public static <E> List<E> collectRecordsFromUnboundedStream(ClientAndIterator<E> clientAndIterator, int i) {
        Preconditions.checkNotNull(clientAndIterator, "client");
        Preconditions.checkArgument(i > 0, "numElement must be > 0");
        ArrayList arrayList = new ArrayList(i);
        Iterator<E> it = clientAndIterator.iterator;
        while (it.hasNext()) {
            arrayList.add(it.next());
            if (arrayList.size() == i) {
                return arrayList;
            }
        }
        throw new IllegalArgumentException(String.format("The stream ended before reaching the requested %d records. Only %d records were received.", Integer.valueOf(i), Integer.valueOf(arrayList.size())));
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
        return reinterpretAsKeyedStream(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> dataStream, KeySelector<T, K> keySelector, TypeInformation<K> typeInformation) {
        return new KeyedStream<>(dataStream, new PartitionTransformation(dataStream.getTransformation(), new ForwardPartitioner()), keySelector, typeInformation);
    }

    private DataStreamUtils() {
    }
}
