/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class StreamingGlobalCommitterOperatorTest
extends TestLogger {
    @Test(expected=IllegalStateException.class)
    public void throwExceptionWithoutSerializer() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(new TestSink.DefaultGlobalCommitter(), null);
        testHarness.initializeEmptyState();
        testHarness.open();
    }

    @Test(expected=IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(null, TestSink.StringCommittableSerializer.INSTANCE);
        testHarness.initializeEmptyState();
        testHarness.open();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void doNotSupportRetry() throws Exception {
        List<String> input = Arrays.asList("lazy", "leaf");
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(new TestSink.AlwaysRetryGlobalCommitter());
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.processElements(input.stream().map(StreamRecord::new).collect(Collectors.toList()));
        testHarness.snapshot(1L, 1L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        testHarness.close();
    }

    @Test
    public void closeCommitter() throws Exception {
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(globalCommitter);
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.close();
        MatcherAssert.assertThat((Object)globalCommitter.isClosed(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void restoredFromMergedState() throws Exception {
        List<String> input1 = Arrays.asList("host", "drop");
        OperatorSubtaskState operatorSubtaskState1 = TestHarnessUtil.buildSubtaskState(this.createTestHarness(), input1);
        List<String> input2 = Arrays.asList("future", "evil", "how");
        OperatorSubtaskState operatorSubtaskState2 = TestHarnessUtil.buildSubtaskState(this.createTestHarness(), input2);
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(globalCommitter);
        OperatorSubtaskState mergedOperatorSubtaskState = OneInputStreamOperatorTestHarness.repackageState(operatorSubtaskState1, operatorSubtaskState2);
        testHarness.initializeState(OneInputStreamOperatorTestHarness.repartitionOperatorState(mergedOperatorSubtaskState, 2, 2, 1, 0));
        testHarness.open();
        ArrayList<String> expectedOutput = new ArrayList<String>();
        expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input1));
        expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input2));
        testHarness.snapshot(1L, 1L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        testHarness.close();
        MatcherAssert.assertThat(globalCommitter.getCommittedData(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
    }

    @Test
    public void commitMultipleStagesTogether() throws Exception {
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter();
        List<String> input1 = Arrays.asList("cautious", "nature");
        List<String> input2 = Arrays.asList("count", "over");
        List<String> input3 = Arrays.asList("lawyer", "grammar");
        ArrayList<String> expectedOutput = new ArrayList<String>();
        expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input1));
        expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input2));
        expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input3));
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(globalCommitter);
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.processElements(input1.stream().map(StreamRecord::new).collect(Collectors.toList()));
        testHarness.snapshot(1L, 1L);
        testHarness.processElements(input2.stream().map(StreamRecord::new).collect(Collectors.toList()));
        testHarness.snapshot(2L, 2L);
        testHarness.processElements(input3.stream().map(StreamRecord::new).collect(Collectors.toList()));
        testHarness.snapshot(3L, 3L);
        testHarness.notifyOfCompletedCheckpoint(3L);
        testHarness.close();
        MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.stream().map(StreamRecord::new).toArray()));
        MatcherAssert.assertThat(globalCommitter.getCommittedData(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
    }

    @Test
    public void filterRecoveredCommittables() throws Exception {
        List<String> input = Arrays.asList("silent", "elder", "patience");
        String successCommittedCommittable = TestSink.DefaultGlobalCommitter.COMBINER.apply(input);
        OperatorSubtaskState operatorSubtaskState = TestHarnessUtil.buildSubtaskState(this.createTestHarness(), input);
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter(successCommittedCommittable);
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(globalCommitter);
        testHarness.initializeState(operatorSubtaskState);
        testHarness.open();
        testHarness.snapshot(1L, 1L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assert.assertTrue((boolean)globalCommitter.getCommittedData().isEmpty());
        testHarness.close();
    }

    @Test
    public void endOfInput() throws Exception {
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(globalCommitter);
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.snapshot(1L, 1L);
        testHarness.endInput();
        testHarness.notifyOfCompletedCheckpoint(1L);
        testHarness.close();
        MatcherAssert.assertThat(globalCommitter.getCommittedData(), (Matcher)Matchers.contains((Object[])new String[]{"end of input"}));
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
        return this.createTestHarness(new TestSink.DefaultGlobalCommitter(), TestSink.StringCommittableSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(GlobalCommitter<String, String> globalCommitter) throws Exception {
        return this.createTestHarness(globalCommitter, TestSink.StringCommittableSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(GlobalCommitter<String, String> globalCommitter, SimpleVersionedSerializer<String> serializer) throws Exception {
        return new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperatorFactory<String, String>)new StreamingGlobalCommitterOperatorFactory((Sink)TestSink.newBuilder().setGlobalCommitter(globalCommitter).setGlobalCommittableSerializer(serializer).build()), (TypeSerializer<String>)StringSerializer.INSTANCE);
    }
}

