package org.apache.flink.streaming.api.operators.collect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.collect.utils.AbstractTestCoordinationRequestHandler;
import org.apache.flink.streaming.api.operators.collect.utils.TestCheckpointedCoordinationRequestHandler;
import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
import org.apache.flink.streaming.api.operators.collect.utils.TestUncheckpointedCoordinationRequestHandler;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.class */
public class CollectResultIteratorTest extends TestLogger {
    private final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
    private static final JobID TEST_JOB_ID = new JobID();
    private static final String ACCUMULATOR_NAME = "accumulatorName";

    @Test
    public void testUncheckpointedIterator() throws Exception {
        Random random = new Random();
        for (int i = 200; i > 0; i--) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 200; i2++) {
                arrayList.add(Integer.valueOf(i2));
            }
            CollectResultIterator collectResultIterator = (CollectResultIterator) createIteratorAndJobClient(new UncheckpointedCollectResultBuffer(this.serializer, true), new TestUncheckpointedCoordinationRequestHandler(random.nextInt(3), arrayList, this.serializer, ACCUMULATOR_NAME)).f0;
            ArrayList arrayList2 = new ArrayList();
            while (collectResultIterator.hasNext()) {
                arrayList2.add(collectResultIterator.next());
            }
            HashSet hashSet = new HashSet(arrayList2);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Assert.assertTrue(hashSet.contains(Integer.valueOf(((Integer) it.next()).intValue())));
            }
            collectResultIterator.close();
        }
    }

    @Test
    public void testCheckpointedIterator() throws Exception {
        for (int i = 200; i > 0; i--) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 200; i2++) {
                arrayList.add(Integer.valueOf(i2));
            }
            CollectResultIterator collectResultIterator = (CollectResultIterator) createIteratorAndJobClient(new CheckpointedCollectResultBuffer(this.serializer), new TestCheckpointedCoordinationRequestHandler(arrayList, this.serializer, ACCUMULATOR_NAME)).f0;
            ArrayList arrayList2 = new ArrayList();
            while (collectResultIterator.hasNext()) {
                arrayList2.add(collectResultIterator.next());
            }
            Assert.assertEquals(arrayList.size(), arrayList2.size());
            Collections.sort(arrayList);
            Collections.sort(arrayList2);
            Assert.assertArrayEquals(arrayList.toArray(new Integer[0]), arrayList2.toArray(new Integer[0]));
            collectResultIterator.close();
        }
    }

    @Test
    public void testEarlyClose() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 200; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        Tuple2<CollectResultIterator<Integer>, JobClient> createIteratorAndJobClient = createIteratorAndJobClient(new CheckpointedCollectResultBuffer(this.serializer), new TestCheckpointedCoordinationRequestHandler(arrayList, this.serializer, ACCUMULATOR_NAME));
        CollectResultIterator collectResultIterator = (CollectResultIterator) createIteratorAndJobClient.f0;
        JobClient jobClient = (JobClient) createIteratorAndJobClient.f1;
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertTrue(collectResultIterator.hasNext());
            Assert.assertNotNull(collectResultIterator.next());
        }
        Assert.assertTrue(collectResultIterator.hasNext());
        collectResultIterator.close();
        Assert.assertEquals(JobStatus.CANCELED, jobClient.getJobStatus().get());
    }

    private Tuple2<CollectResultIterator<Integer>, JobClient> createIteratorAndJobClient(AbstractCollectResultBuffer<Integer> abstractCollectResultBuffer, final AbstractTestCoordinationRequestHandler<Integer> abstractTestCoordinationRequestHandler) {
        CollectResultIterator collectResultIterator = new CollectResultIterator(abstractCollectResultBuffer, CompletableFuture.completedFuture(TEST_OPERATOR_ID), ACCUMULATOR_NAME, 0);
        TestJobClient testJobClient = new TestJobClient(TEST_JOB_ID, TEST_OPERATOR_ID, abstractTestCoordinationRequestHandler, new TestJobClient.JobInfoProvider() { // from class: org.apache.flink.streaming.api.operators.collect.CollectResultIteratorTest.1
            @Override // org.apache.flink.streaming.api.operators.collect.utils.TestJobClient.JobInfoProvider
            public boolean isJobFinished() {
                return abstractTestCoordinationRequestHandler.isClosed();
            }

            @Override // org.apache.flink.streaming.api.operators.collect.utils.TestJobClient.JobInfoProvider
            public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
                return abstractTestCoordinationRequestHandler.getAccumulatorResults();
            }
        });
        collectResultIterator.setJobClient(testJobClient);
        return Tuple2.of(collectResultIterator, testJobClient);
    }
}
