package org.apache.flink.runtime.operators.coordination;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.class */
public class SubtaskGatewayImplTest {
    @Test
    public void eventsPassThroughOpenGateway() {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(createForRunningTasks.getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        CompletableFuture sendEvent = subtaskGatewayImpl.sendEvent(testOperatorEvent);
        Assertions.assertThat(createForRunningTasks.events).containsExactly(new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(testOperatorEvent, 11)});
        Assertions.assertThat(sendEvent).isDone();
    }

    @Test
    public void closingMarkedGateway() {
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(EventReceivingTasks.createForRunningTasks().getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        subtaskGatewayImpl.markForCheckpoint(200L);
        Assertions.assertThat(subtaskGatewayImpl.tryCloseGateway(200L)).isTrue();
    }

    @Test
    public void notClosingUnmarkedGateway() {
        Assertions.assertThat(new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(EventReceivingTasks.createForRunningTasks().getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker()).tryCloseGateway(123L)).isFalse();
    }

    @Test
    public void notClosingGatewayForOtherMark() {
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(EventReceivingTasks.createForRunningTasks().getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        subtaskGatewayImpl.markForCheckpoint(100L);
        Assertions.assertThat(subtaskGatewayImpl.tryCloseGateway(123L)).isFalse();
    }

    @Test
    public void eventsBlockedByClosedGateway() {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(createForRunningTasks.getAccessesForSubtask(1)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        subtaskGatewayImpl.markForCheckpoint(1L);
        subtaskGatewayImpl.tryCloseGateway(1L);
        CompletableFuture sendEvent = subtaskGatewayImpl.sendEvent(new TestOperatorEvent());
        Assertions.assertThat(createForRunningTasks.events).isEmpty();
        Assertions.assertThat(sendEvent).isNotDone();
    }

    @Test
    public void eventsReleasedAfterOpeningGateway() {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(createForRunningTasks.getAccessesForSubtask(0)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        SubtaskGatewayImpl subtaskGatewayImpl2 = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(createForRunningTasks.getAccessesForSubtask(3)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        List asList = Arrays.asList(subtaskGatewayImpl2, subtaskGatewayImpl);
        asList.forEach(subtaskGatewayImpl3 -> {
            subtaskGatewayImpl3.markForCheckpoint(17L);
        });
        asList.forEach(subtaskGatewayImpl4 -> {
            subtaskGatewayImpl4.tryCloseGateway(17L);
        });
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        TestOperatorEvent testOperatorEvent2 = new TestOperatorEvent();
        CompletableFuture sendEvent = subtaskGatewayImpl2.sendEvent(testOperatorEvent);
        CompletableFuture sendEvent2 = subtaskGatewayImpl.sendEvent(testOperatorEvent2);
        asList.forEach((v0) -> {
            v0.openGatewayAndUnmarkAllCheckpoint();
        });
        Assertions.assertThat(createForRunningTasks.events).containsExactly(new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(testOperatorEvent, 3), new EventReceivingTasks.EventWithSubtask(testOperatorEvent2, 0)});
        Assertions.assertThat(sendEvent).isDone();
        Assertions.assertThat(sendEvent2).isDone();
    }

    @Test
    public void releasedEventsForwardSendFailures() {
        SubtaskGatewayImpl subtaskGatewayImpl = new SubtaskGatewayImpl((SubtaskAccess) getUniqueElement(EventReceivingTasks.createForRunningTasksFailingRpcs(new FlinkException("test")).getAccessesForSubtask(10)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        subtaskGatewayImpl.markForCheckpoint(17L);
        subtaskGatewayImpl.tryCloseGateway(17L);
        CompletableFuture sendEvent = subtaskGatewayImpl.sendEvent(new TestOperatorEvent());
        subtaskGatewayImpl.openGatewayAndUnmarkAllCheckpoint();
        Assertions.assertThat(sendEvent).isCompletedExceptionally();
    }

    private static <T> T getUniqueElement(Collection<T> collection) {
        Iterator<T> it = collection.iterator();
        T next = it.next();
        Preconditions.checkState(!it.hasNext());
        return next;
    }
}
