package org.apache.flink.runtime.source.coordinator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.class */
class SourceCoordinatorConcurrentAttemptsTest extends SourceCoordinatorTestBase {
    private boolean enumeratorSupportsHandleExecutionAttemptSourceEvent;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest$TestEnumerator.class */
    public static class TestEnumerator<SplitT extends SourceSplit> extends TestingSplitEnumerator<SplitT> implements SupportsHandleExecutionAttemptSourceEvent {
        private final Map<Integer, Map<Integer, SourceEvent>> sourceEvents;

        private TestEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext) {
            super(splitEnumeratorContext);
            this.sourceEvents = new HashMap();
        }

        private TestEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext, Collection<SplitT> collection) {
            super(splitEnumeratorContext, collection);
            this.sourceEvents = new HashMap();
        }

        public void handleSourceEvent(int i, int i2, SourceEvent sourceEvent) {
            this.sourceEvents.computeIfAbsent(Integer.valueOf(i), (v1) -> {
                return new HashMap(v1);
            }).put(Integer.valueOf(i2), sourceEvent);
            handleSourceEvent(i, sourceEvent);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SourceEvent getEvent(int i, int i2) {
            return this.sourceEvents.getOrDefault(Integer.valueOf(i), new HashMap()).get(Integer.valueOf(i2));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest$TestSource.class */
    private static final class TestSource<T, SplitT extends SourceSplit> extends TestingSplitEnumerator.FactorySource<T, SplitT> {
        public TestSource(SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, SimpleVersionedSerializer<Set<SplitT>> simpleVersionedSerializer2) {
            super(simpleVersionedSerializer, simpleVersionedSerializer2);
        }

        @Override // org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator.FactorySource
        /* renamed from: createEnumerator */
        public TestingSplitEnumerator<SplitT> mo552createEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext) {
            return new TestEnumerator(splitEnumeratorContext);
        }

        @Override // org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator.FactorySource
        public SplitEnumerator<SplitT, Set<SplitT>> restoreEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext, Set<SplitT> set) {
            return new TestEnumerator(splitEnumeratorContext, set);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest$TestSourceEvent.class */
    private static class TestSourceEvent implements SourceEvent, Serializable {
        private TestSourceEvent() {
        }
    }

    SourceCoordinatorConcurrentAttemptsTest() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase
    @BeforeEach
    public void setup() {
        this.supportsConcurrentExecutionAttempts = true;
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = true;
        super.setup();
    }

    @Test
    void testCoordinatorThrowExceptionIfWatermarkAlignmentIsEnabled() {
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = false;
        Assertions.assertThatThrownBy(() -> {
            getNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE));
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testCoordinatorFailJobOnSourceEventToNonsupportingEnumerator() throws Exception {
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = false;
        this.sourceCoordinator = getNewSourceCoordinator();
        this.context = this.sourceCoordinator.getContext();
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, 0, new SourceEventWrapper(new TestSourceEvent()));
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.operatorCoordinatorContext.isJobFailed()).isTrue();
    }

    @Test
    void testContextThrowExceptionOnSourceEventToNonsupportingMethod() throws Exception {
        this.enumeratorSupportsHandleExecutionAttemptSourceEvent = false;
        this.sourceCoordinator = getNewSourceCoordinator();
        this.context = this.sourceCoordinator.getContext();
        this.sourceCoordinator.start();
        sourceReady();
        Assertions.assertThatThrownBy(() -> {
            this.context.sendEventToSourceReader(0, new TestSourceEvent());
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testConcurrentAttemptsRequestSplits() throws Exception {
        this.sourceCoordinator.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(new MockSourceSplit(i));
        }
        getEnumerator().addNewSplits(arrayList);
        setReaderTaskReady(this.sourceCoordinator, 0, 0);
        registerReader(0, 0);
        this.sourceCoordinator.handleEventFromOperator(0, 0, new RequestSplitEvent());
        waitForSentEvents(1);
        setReaderTaskReady(this.sourceCoordinator, 0, 3);
        registerReader(0, 3);
        waitForSentEvents(2);
        this.sourceCoordinator.handleEventFromOperator(0, 3, new RequestSplitEvent());
        waitForSentEvents(4);
        this.sourceCoordinator.handleEventFromOperator(0, 0, new RequestSplitEvent());
        waitForSentEvents(6);
        setReaderTaskReady(this.sourceCoordinator, 0, 5);
        registerReader(0, 5);
        waitForSentEvents(8);
        List<OperatorEvent> sentEventsForSubtask = this.receivingTasks.getSentEventsForSubtask(0);
        assertAddSplitEvent(sentEventsForSubtask.get(0), Collections.singletonList(arrayList.get(0)));
        assertAddSplitEvent(sentEventsForSubtask.get(1), Collections.singletonList(arrayList.get(0)));
        assertAddSplitEvent(sentEventsForSubtask.get(2), Collections.singletonList(arrayList.get(1)));
        assertAddSplitEvent(sentEventsForSubtask.get(3), Collections.singletonList(arrayList.get(1)));
        assertAddSplitEvent(sentEventsForSubtask.get(6), arrayList);
        Assertions.assertThat(sentEventsForSubtask.get(4)).isInstanceOf(NoMoreSplitsEvent.class);
        Assertions.assertThat(sentEventsForSubtask.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
        Assertions.assertThat(sentEventsForSubtask.get(7)).isInstanceOf(NoMoreSplitsEvent.class);
    }

    @Test
    public void testReaderInfoOfConcurrentAttempts() throws Exception {
        this.sourceCoordinator.start();
        registerReader(0, 3);
        registerReader(0, 5);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.context.registeredReadersOfAttempts()).hasSize(1);
        Map map = (Map) this.context.registeredReadersOfAttempts().get(0);
        Assertions.assertThat(map).containsOnlyKeys(new Integer[]{3, 5});
        Assertions.assertThat(map.get(3)).isNotNull();
        Assertions.assertThat(((ReaderInfo) map.get(3)).getLocation()).isEqualTo(createLocationFor(0, 3));
        Assertions.assertThat(map.get(5)).isNotNull();
        Assertions.assertThat(((ReaderInfo) map.get(5)).getLocation()).isEqualTo(createLocationFor(0, 5));
        this.sourceCoordinator.executionAttemptFailed(0, 5, new Exception());
        waitForCoordinatorToProcessActions();
        Assertions.assertThat((Map) this.context.registeredReadersOfAttempts().get(0)).containsOnlyKeys(new Integer[]{3});
        this.sourceCoordinator.subtaskReset(0, -1L);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.context.registeredReadersOfAttempts()).isEmpty();
    }

    @Test
    public void testSubtaskReaderInfoOfConcurrentAttempts() throws Exception {
        this.sourceCoordinator.start();
        registerReader(0, 3);
        registerReader(0, 5);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(this.context.registeredReaders()).hasSize(1);
        Assertions.assertThat(((ReaderInfo) this.context.registeredReaders().get(0)).getLocation()).isEqualTo(createLocationFor(0, 3));
        registerReader(0, 1);
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(((ReaderInfo) this.context.registeredReaders().get(0)).getLocation()).isEqualTo(createLocationFor(0, 1));
    }

    @Test
    public void testForwardAttemptSourceEvents() throws Exception {
        this.sourceCoordinator.start();
        TestSourceEvent testSourceEvent = new TestSourceEvent();
        TestSourceEvent testSourceEvent2 = new TestSourceEvent();
        this.sourceCoordinator.handleEventFromOperator(0, 3, new SourceEventWrapper(testSourceEvent));
        this.sourceCoordinator.handleEventFromOperator(0, 5, new SourceEventWrapper(testSourceEvent2));
        waitForCoordinatorToProcessActions();
        Assertions.assertThat(getTestEnumerator().getEvent(0, 3)).isSameAs(testSourceEvent);
        Assertions.assertThat(getTestEnumerator().getEvent(0, 5)).isSameAs(testSourceEvent2);
    }

    @Override // org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase
    Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
        return this.enumeratorSupportsHandleExecutionAttemptSourceEvent ? new TestSource(new MockSourceSplitSerializer(), new MockSplitEnumeratorCheckpointSerializer()) : TestingSplitEnumerator.factorySource(new MockSourceSplitSerializer(), new MockSplitEnumeratorCheckpointSerializer());
    }

    private TestEnumerator<MockSourceSplit> getTestEnumerator() {
        return (TestEnumerator) super.getEnumerator();
    }
}
