package org.apache.flink.streaming.api.operators;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.class */
public class SourceOperatorSplitWatermarkAlignmentTest {
    public static final WatermarkGenerator<Integer> WATERMARK_GENERATOR = new WatermarkGenerator<Integer>() { // from class: org.apache.flink.streaming.api.operators.SourceOperatorSplitWatermarkAlignmentTest.1
        private long maxWatermark = Long.MIN_VALUE;

        public void onEvent(Integer num, long j, WatermarkOutput watermarkOutput) {
            if (j > this.maxWatermark) {
                this.maxWatermark = j;
                watermarkOutput.emitWatermark(new Watermark(this.maxWatermark));
            }
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new Watermark(this.maxWatermark));
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest$SplitAligningSourceReader.class */
    private static class SplitAligningSourceReader extends MockSourceReader {
        Set<String> pausedSplits;

        public SplitAligningSourceReader() {
            super(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
            this.pausedSplits = new HashSet();
        }

        public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
            this.pausedSplits.removeAll(collection2);
            this.pausedSplits.addAll(collection);
        }
    }

    @Test
    public void testSplitWatermarkAlignment() throws Exception {
        SplitAligningSourceReader splitAligningSourceReader = new SplitAligningSourceReader();
        TestingSourceOperator testingSourceOperator = new TestingSourceOperator(splitAligningSourceReader, WatermarkStrategy.forGenerator(context -> {
            return WATERMARK_GENERATOR;
        }).withTimestampAssigner((num, j) -> {
            return num.intValue();
        }).withWatermarkAlignment("group-1", Duration.ofMillis(1L)), new TestProcessingTimeService(), new MockOperatorEventGateway(), 1, 5, true);
        Environment testingEnvironment = getTestingEnvironment();
        testingSourceOperator.setup(new SourceOperatorStreamTask(testingEnvironment), new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        testingSourceOperator.initializeState(new StreamTaskStateInitializerImpl(testingEnvironment, new MemoryStateBackend()));
        testingSourceOperator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, 10);
        MockSourceSplit mockSourceSplit2 = new MockSourceSplit(1, 10, 20);
        mockSourceSplit.addRecord(5);
        mockSourceSplit.addRecord(11);
        mockSourceSplit2.addRecord(3);
        mockSourceSplit2.addRecord(12);
        testingSourceOperator.handleOperatorEvent(new AddSplitEvent(Arrays.asList(mockSourceSplit, mockSourceSplit2), new MockSourceSplitSerializer()));
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        testingSourceOperator.emitNext(collectingDataOutput);
        testingSourceOperator.handleOperatorEvent(new WatermarkAlignmentEvent(4L));
        Assertions.assertThat(splitAligningSourceReader.pausedSplits).containsExactly(new String[]{"0"});
        testingSourceOperator.handleOperatorEvent(new WatermarkAlignmentEvent(5L));
        Assertions.assertThat(splitAligningSourceReader.pausedSplits).isEmpty();
        testingSourceOperator.emitNext(collectingDataOutput);
        testingSourceOperator.emitNext(collectingDataOutput);
        Assertions.assertThat(splitAligningSourceReader.pausedSplits).containsExactly(new String[]{"0"});
        testingSourceOperator.emitNext(collectingDataOutput);
        Assertions.assertThat(splitAligningSourceReader.pausedSplits).containsExactly(new String[]{"0", "1"});
    }

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, new TestTaskStateManager());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -327693477:
                if (implMethodName.equals("lambda$testSplitWatermarkAlignment$dcee6a8b$1")) {
                    z = true;
                    break;
                }
                break;
            case -133389509:
                if (implMethodName.equals("lambda$testSplitWatermarkAlignment$1a4d0a14$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;J)J")) {
                    return (num, j) -> {
                        return num.intValue();
                    };
                }
                break;
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return WATERMARK_GENERATOR;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
