package org.apache.flink.runtime.operators.coordination;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.util.FlinkException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.class */
public class OperatorEventValveTest {
    @Test
    public void eventsPassThroughOpenValve() {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorEventValve operatorEventValve = new OperatorEventValve();
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        CompletableFuture completableFuture = new CompletableFuture();
        operatorEventValve.sendEvent(createForRunningTasks.createSendAction(testOperatorEvent, 11), completableFuture);
        Assert.assertThat(createForRunningTasks.events, Matchers.contains(new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(testOperatorEvent, 11)}));
        Assert.assertTrue(completableFuture.isDone());
    }

    @Test
    public void shuttingMarkedValve() {
        OperatorEventValve operatorEventValve = new OperatorEventValve();
        operatorEventValve.markForCheckpoint(200L);
        Assert.assertTrue(operatorEventValve.tryShutValve(200L));
    }

    @Test
    public void notShuttingUnmarkedValve() {
        Assert.assertFalse(new OperatorEventValve().tryShutValve(123L));
    }

    @Test
    public void notShuttingValveForOtherMark() {
        OperatorEventValve operatorEventValve = new OperatorEventValve();
        operatorEventValve.markForCheckpoint(100L);
        Assert.assertFalse(operatorEventValve.tryShutValve(123L));
    }

    @Test
    public void eventsBlockedByClosedValve() {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorEventValve operatorEventValve = new OperatorEventValve();
        operatorEventValve.markForCheckpoint(1L);
        operatorEventValve.tryShutValve(1L);
        CompletableFuture completableFuture = new CompletableFuture();
        operatorEventValve.sendEvent(createForRunningTasks.createSendAction(new TestOperatorEvent(), 1), completableFuture);
        Assert.assertTrue(createForRunningTasks.events.isEmpty());
        Assert.assertFalse(completableFuture.isDone());
    }

    @Test
    public void eventsReleasedAfterOpeningValve() {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorEventValve operatorEventValve = new OperatorEventValve();
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.tryShutValve(17L);
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        TestOperatorEvent testOperatorEvent2 = new TestOperatorEvent();
        CompletableFuture completableFuture = new CompletableFuture();
        operatorEventValve.sendEvent(createForRunningTasks.createSendAction(testOperatorEvent, 3), completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        operatorEventValve.sendEvent(createForRunningTasks.createSendAction(testOperatorEvent2, 0), completableFuture2);
        operatorEventValve.openValveAndUnmarkCheckpoint();
        Assert.assertThat(createForRunningTasks.events, Matchers.contains(new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(testOperatorEvent, 3), new EventReceivingTasks.EventWithSubtask(testOperatorEvent2, 0)}));
        Assert.assertTrue(completableFuture.isDone());
        Assert.assertTrue(completableFuture2.isDone());
    }

    @Test
    public void releasedEventsForwardSendFailures() {
        EventReceivingTasks createForRunningTasksFailingRpcs = EventReceivingTasks.createForRunningTasksFailingRpcs(new FlinkException("test"));
        OperatorEventValve operatorEventValve = new OperatorEventValve();
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.tryShutValve(17L);
        CompletableFuture completableFuture = new CompletableFuture();
        operatorEventValve.sendEvent(createForRunningTasksFailingRpcs.createSendAction(new TestOperatorEvent(), 10), completableFuture);
        operatorEventValve.openValveAndUnmarkCheckpoint();
        Assert.assertTrue(completableFuture.isCompletedExceptionally());
    }
}
