/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.BatchCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

public class BatchCommitterOperatorTest
extends TestLogger {
    @Test(expected=IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(null);
        testHarness.initializeEmptyState();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void doNotSupportRetry() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(new TestSink.AlwaysRetryCommitter());
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.processElement((StreamRecord<String>)new StreamRecord((Object)"those"));
        testHarness.endInput();
        testHarness.close();
    }

    @Test
    public void commit() throws Exception {
        TestSink.DefaultCommitter committer = new TestSink.DefaultCommitter();
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(committer);
        List<String> expectedCommittedData = Arrays.asList("youth", "laugh", "nothing");
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.processElements(expectedCommittedData.stream().map(StreamRecord::new).collect(Collectors.toList()));
        testHarness.endInput();
        testHarness.close();
        MatcherAssert.assertThat(committer.getCommittedData(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedCommittedData.toArray()));
        MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedCommittedData.stream().map(StreamRecord::new).toArray()));
    }

    @Test
    public void close() throws Exception {
        TestSink.DefaultCommitter committer = new TestSink.DefaultCommitter();
        OneInputStreamOperatorTestHarness<String, String> testHarness = this.createTestHarness(committer);
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.close();
        MatcherAssert.assertThat((Object)committer.isClosed(), (Matcher)Matchers.is((Object)true));
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(Committer<String> committer) throws Exception {
        return new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperatorFactory<String, String>)new BatchCommitterOperatorFactory((Sink)TestSink.newBuilder().setCommitter(committer).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build()), (TypeSerializer<String>)StringSerializer.INSTANCE);
    }
}

