/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class MultipleInputStreamTaskChainedSourcesCheckpointingTest {
    private static final int MAX_STEPS = 100;
    private final CheckpointMetaData metaData = new CheckpointMetaData(1L, System.currentTimeMillis());

    @Test
    public void testSourceCheckpointFirst() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness();){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            this.addRecordsAndBarriers(testHarness, barrier);
            Future checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            MatcherAssert.assertThat(actualOutput.subList(0, expectedOutput.size()), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
            MatcherAssert.assertThat((Object)actualOutput.get(expectedOutput.size()), (Matcher)Matchers.equalTo((Object)barrier));
        }
    }

    @Test
    public void testSourceCheckpointFirstUnaligned() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(true);){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            this.addRecords(testHarness);
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            Future checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{barrier}));
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            MatcherAssert.assertThat(actualOutput.subList(1, expectedOutput.size() + 1), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testSourceCheckpointLast() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness();){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            this.addRecordsAndBarriers(testHarness, barrier);
            testHarness.processAll();
            Future checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            MatcherAssert.assertThat(actualOutput.subList(0, expectedOutput.size()), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
            MatcherAssert.assertThat((Object)actualOutput.get(expectedOutput.size()), (Matcher)Matchers.equalTo((Object)barrier));
        }
    }

    @Test
    public void testSourceCheckpointLastUnaligned() throws Exception {
        boolean unaligned = true;
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(unaligned);){
            testHarness.setAutoProcess(false);
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            this.addNetworkRecords(testHarness);
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            this.addBarriers(testHarness, barrier);
            testHarness.processAll();
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 1337, 1337, 1337);
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(barrier);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testOnlyOneSource() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(config -> config.enableObjectReuse()).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks())).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(1)).build();){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 0, 42, 43, 44);
            this.processSingleStepUntil(testHarness, () -> !testHarness.getOutput().isEmpty());
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            Future checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            MatcherAssert.assertThat(actualOutput.subList(0, expectedOutput.size()), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
            MatcherAssert.assertThat((Object)actualOutput.get(expectedOutput.size()), (Matcher)Matchers.equalTo((Object)barrier));
        }
    }

    private void addRecordsAndBarriers(StreamTaskMailboxTestHarness<String> testHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        this.addRecords(testHarness);
        this.addBarriers(testHarness, checkpointBarrier);
    }

    private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> testHarness) {
        StreamConfig config = testHarness.getStreamTask().getConfiguration();
        CheckpointOptions checkpointOptions = CheckpointOptions.forConfig((CheckpointType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (boolean)config.isExactlyOnceCheckpointMode(), (boolean)config.isUnalignedCheckpointsEnabled(), (long)config.getAlignmentTimeout().toMillis());
        return new CheckpointBarrier(this.metaData.getCheckpointId(), this.metaData.getTimestamp(), checkpointOptions);
    }

    private void addBarriers(StreamTaskMailboxTestHarness<String> testHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        testHarness.processEvent((AbstractEvent)checkpointBarrier, 0);
        testHarness.processEvent((AbstractEvent)checkpointBarrier, 1);
    }

    private void addRecords(StreamTaskMailboxTestHarness<String> testHarness) throws Exception {
        MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42, 42, 42);
        this.addNetworkRecords(testHarness);
    }

    private void addNetworkRecords(StreamTaskMailboxTestHarness<String> testHarness) throws Exception {
        testHarness.processElement(new StreamRecord((Object)"44", Long.MIN_VALUE), 0);
        testHarness.processElement(new StreamRecord((Object)"44", Long.MIN_VALUE), 0);
        testHarness.processElement(new StreamRecord((Object)47.0, Long.MIN_VALUE), 1);
        testHarness.processElement(new StreamRecord((Object)47.0, Long.MIN_VALUE), 1);
    }

    private void processSingleStepUntil(StreamTaskMailboxTestHarness<String> testHarness, Supplier<Boolean> condition) throws Exception {
        Assert.assertFalse((boolean)condition.get());
        for (int i = 0; i < 100 && !condition.get().booleanValue(); ++i) {
            testHarness.processSingleStep();
        }
        Assert.assertTrue((boolean)condition.get());
    }
}

