package org.apache.flink.streaming.api.watermark.generalized;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.watermark.BoolWatermark;
import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.api.common.watermark.WatermarkCombinationFunction;
import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy;
import org.apache.flink.api.common.watermark.WatermarkDeclarations;
import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.AlignedWatermarkCombiner;
import org.apache.flink.streaming.runtime.watermark.InternalLongWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/watermark/generalized/WatermarkCombinerTest.class */
class WatermarkCombinerTest {
    private static final String DEFAULT_WATERMARK_IDENTIFIER = "default";
    private List<Watermark> receivedWatermarks = new ArrayList();
    private WatermarkCombiner combiner;

    WatermarkCombinerTest() {
    }

    @BeforeEach
    public void setup() {
        this.receivedWatermarks.clear();
        this.combiner = null;
    }

    @Test
    void testAlignedWatermarkCombiner() throws Exception {
        InternalLongWatermarkDeclaration internalLongWatermarkDeclaration = new InternalLongWatermarkDeclaration(DEFAULT_WATERMARK_IDENTIFIER, new WatermarkCombinationPolicy(WatermarkCombinationFunction.NumericWatermarkCombinationFunction.MIN, true), WatermarkHandlingStrategy.FORWARD, true);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.combiner = new AlignedWatermarkCombiner(2, () -> {
            atomicBoolean.set(true);
        });
        executeAndCheckCombineStepWithLongWatermark(internalLongWatermarkDeclaration.newWatermark(1L), 0, new Long[0]);
        Assertions.assertThat(atomicBoolean.get()).isFalse();
        executeAndCheckCombineStepWithLongWatermark(internalLongWatermarkDeclaration.newWatermark(1L), 1, 1L);
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        atomicBoolean.set(false);
        executeAndCheckCombineStepWithLongWatermark(internalLongWatermarkDeclaration.newWatermark(2L), 1, 1L);
        Assertions.assertThat(atomicBoolean.get()).isFalse();
        executeAndCheckCombineStepWithLongWatermark(internalLongWatermarkDeclaration.newWatermark(2L), 0, 1L, 2L);
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void testLongWatermarkCombinerWaitForAllChannels() throws Exception {
        LongWatermarkDeclaration build = WatermarkDeclarations.newBuilder(DEFAULT_WATERMARK_IDENTIFIER).typeLong().combineFunctionMax().combineWaitForAllChannels(true).build();
        this.combiner = AbstractInternalWatermarkDeclaration.from(build).createWatermarkCombiner(2, (Runnable) null);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(3L), 0, new Long[0]);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 0, new Long[0]);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(1L), 1, 2L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(3L), 0, 2L, 3L);
    }

    @Test
    void testLongWatermarkCombinerCombineMax() throws Exception {
        LongWatermarkDeclaration build = WatermarkDeclarations.newBuilder(DEFAULT_WATERMARK_IDENTIFIER).typeLong().combineFunctionMax().build();
        this.combiner = AbstractInternalWatermarkDeclaration.from(build).createWatermarkCombiner(2, (Runnable) null);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(1L), 0, 1L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 0, 1L, 2L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 1, 1L, 2L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(3L), 1, 1L, 2L, 3L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 1, 1L, 2L, 3L, 2L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 0, 1L, 2L, 3L, 2L);
    }

    @Test
    void testLongWatermarkCombinerCombineMin() throws Exception {
        LongWatermarkDeclaration build = WatermarkDeclarations.newBuilder(DEFAULT_WATERMARK_IDENTIFIER).typeLong().combineFunctionMin().build();
        this.combiner = AbstractInternalWatermarkDeclaration.from(build).createWatermarkCombiner(2, (Runnable) null);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 0, 2L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(1L), 1, 2L, 1L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(3L), 0, 2L, 1L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(1L), 1, 2L, 1L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(2L), 1, 2L, 1L, 2L);
        executeAndCheckCombineStepWithLongWatermark(build.newWatermark(4L), 1, 2L, 1L, 2L, 3L);
    }

    @Test
    void testBoolWatermarkCombinerWaitForAllChannels() throws Exception {
        BoolWatermarkDeclaration build = WatermarkDeclarations.newBuilder(DEFAULT_WATERMARK_IDENTIFIER).typeBool().combineFunctionAND().combineWaitForAllChannels(true).build();
        this.combiner = AbstractInternalWatermarkDeclaration.from(build).createWatermarkCombiner(2, (Runnable) null);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 0, new Boolean[0]);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 0, new Boolean[0]);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(false), 1, false);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 1, false, true);
    }

    @Test
    void testBoolWatermarkCombinerCombineAnd() throws Exception {
        BoolWatermarkDeclaration build = WatermarkDeclarations.newBuilder(DEFAULT_WATERMARK_IDENTIFIER).typeBool().combineFunctionAND().build();
        this.combiner = AbstractInternalWatermarkDeclaration.from(build).createWatermarkCombiner(2, (Runnable) null);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 0, true);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(false), 1, true, false);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 1, true, false, true);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 0, true, false, true);
    }

    @Test
    void testBoolWatermarkCombinerCombineOr() throws Exception {
        BoolWatermarkDeclaration build = WatermarkDeclarations.newBuilder(DEFAULT_WATERMARK_IDENTIFIER).typeBool().combineFunctionOR().build();
        this.combiner = AbstractInternalWatermarkDeclaration.from(build).createWatermarkCombiner(2, (Runnable) null);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 0, true);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(false), 1, true);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(true), 1, true);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(false), 1, true);
        executeAndCheckCombineStepWithBoolWatermark(build.newWatermark(false), 0, true, false);
    }

    private void executeAndCheckCombineStepWithLongWatermark(LongWatermark longWatermark, int i, Long... lArr) throws Exception {
        WatermarkCombiner watermarkCombiner = this.combiner;
        List<Watermark> list = this.receivedWatermarks;
        Objects.requireNonNull(list);
        watermarkCombiner.combineWatermark(longWatermark, i, (v1) -> {
            r3.add(v1);
        });
        Assertions.assertThat(this.receivedWatermarks.stream().map(watermark -> {
            return Long.valueOf(((LongWatermark) watermark).getValue());
        })).containsExactly(lArr);
    }

    private void executeAndCheckCombineStepWithBoolWatermark(BoolWatermark boolWatermark, int i, Boolean... boolArr) throws Exception {
        WatermarkCombiner watermarkCombiner = this.combiner;
        List<Watermark> list = this.receivedWatermarks;
        Objects.requireNonNull(list);
        watermarkCombiner.combineWatermark(boolWatermark, i, (v1) -> {
            r3.add(v1);
        });
        Assertions.assertThat(this.receivedWatermarks.stream().map(watermark -> {
            return Boolean.valueOf(((BoolWatermark) watermark).getValue());
        })).containsExactly(boolArr);
    }
}
