package org.apache.flink.runtime.asyncprocessing;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManagerTest.class */
class EpochManagerTest {
    EpochManagerTest() {
    }

    @Test
    void testBasic() {
        EpochManager epochManager = new EpochManager((AsyncExecutionController) null);
        EpochManager.Epoch onRecord = epochManager.onRecord();
        EpochManager.Epoch onRecord2 = epochManager.onRecord();
        AssertionsForClassTypes.assertThat(onRecord).isEqualTo(onRecord2);
        AssertionsForClassTypes.assertThat(onRecord.ongoingRecordCount).isEqualTo(2);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        epochManager.onNonRecord((Runnable) null, () -> {
            atomicInteger.incrementAndGet();
        }, EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        EpochManager.Epoch onRecord3 = epochManager.onRecord();
        AssertionsForClassTypes.assertThat(onRecord3).isNotEqualTo(onRecord);
        AssertionsForClassTypes.assertThat(epochManager.outputQueue.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(((EpochManager.Epoch) epochManager.outputQueue.peek()).status).isEqualTo(EpochManager.EpochStatus.CLOSED);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        epochManager.completeOneRecord(onRecord);
        epochManager.completeOneRecord(onRecord2);
        epochManager.completeOneRecord(onRecord3);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(epochManager.outputQueue.size()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(epochManager.activeEpoch.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat(epochManager.activeEpoch.status).isEqualTo(EpochManager.EpochStatus.OPEN);
        epochManager.onNonRecord((Runnable) null, () -> {
            atomicInteger.incrementAndGet();
            epochManager.completeOneRecord(epochManager.onRecord());
        }, EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(2);
    }

    @Test
    void testTwoAction() {
        EpochManager epochManager = new EpochManager((AsyncExecutionController) null);
        EpochManager.Epoch onRecord = epochManager.onRecord();
        EpochManager.Epoch onRecord2 = epochManager.onRecord();
        AssertionsForClassTypes.assertThat(onRecord).isEqualTo(onRecord2);
        AssertionsForClassTypes.assertThat(onRecord.ongoingRecordCount).isEqualTo(2);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        epochManager.onNonRecord(() -> {
            epochManager.onEpoch(onRecord);
        }, () -> {
            atomicInteger.incrementAndGet();
        }, EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat(onRecord.status).isEqualTo(EpochManager.EpochStatus.CLOSED);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        epochManager.completeOneRecord(onRecord);
        epochManager.completeOneRecord(onRecord2);
        AssertionsForClassTypes.assertThat(onRecord.status).isEqualTo(EpochManager.EpochStatus.FINISHING);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        epochManager.completeOneRecord(onRecord);
        AssertionsForClassTypes.assertThat(onRecord.status).isEqualTo(EpochManager.EpochStatus.FINISHED);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
    }
}
